Skip to main content

reifydb_store_multi/store/
drop.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! Drop operation implementation for cleaning up versioned entries.
5//!
6//! The Drop operation completely erases versioned entries from storage without
7//! writing tombstones or generating CDC events. It's used for internal cleanup
8//! operations like maintaining single-version semantics for flow node state.
9
10use reifydb_core::common::CommitVersion;
11use reifydb_type::util::cowvec::CowVec;
12
13use crate::{
14	Result,
15	tier::{EntryKind, TierStorage},
16};
17
18/// Information about an entry to be dropped.
19#[derive(Debug, Clone)]
20pub struct DropEntry {
21	/// The logical key to delete
22	pub key: CowVec<u8>,
23	/// The specific version to delete
24	pub version: CommitVersion,
25	/// The size of the value being dropped (for metrics tracking)
26	pub value_bytes: u64,
27}
28
29/// Find versioned keys to drop based on constraints.
30///
31/// # Arguments
32/// - `storage`: The storage backend to scan
33/// - `table`: The table containing the keys
34/// - `key`: The logical key (without version suffix)
35/// - `up_to_version`: If Some(v), candidate versions where version < v
36/// - `keep_last_versions`: If Some(n), protect n most recent versions from being dropped
37/// - `pending_version`: Version being written in the same batch (to avoid race)
38pub(crate) fn find_keys_to_drop<S: TierStorage>(
39	storage: &S,
40	table: EntryKind,
41	key: &[u8],
42	up_to_version: Option<CommitVersion>,
43	keep_last_versions: Option<usize>,
44	pending_version: Option<CommitVersion>,
45) -> Result<Vec<DropEntry>> {
46	// Get all versions of this key directly (bypasses MVCC resolution)
47	let all_versions = storage.get_all_versions(table, key)?;
48
49	// Collect all versions with their value sizes
50	let mut versioned_entries: Vec<(CommitVersion, u64)> = all_versions
51		.into_iter()
52		.map(|(version, value)| {
53			let value_bytes = value.as_ref().map(|v| v.len() as u64).unwrap_or(0);
54			(version, value_bytes)
55		})
56		.collect();
57
58	// Include pending version if provided (version being written in current batch)
59	// This prevents a race where Drop scans storage before Set is written
60	if let Some(pending_ver) = pending_version {
61		// Check if pending version already exists (avoid duplicates)
62		if !versioned_entries.iter().any(|(v, _)| *v == pending_ver) {
63			// Add a placeholder entry for the pending version
64			// value_bytes=0 is fine since this entry will never be dropped (it's the newest)
65			versioned_entries.push((pending_ver, 0));
66		}
67	}
68
69	// Sort by version descending (most recent first) for keep_last_versions logic
70	versioned_entries.sort_by(|a, b| b.0.cmp(&a.0));
71
72	// Determine which entries to drop
73	let mut entries_to_drop = Vec::new();
74	let key_cow = CowVec::new(key.to_vec());
75
76	for (idx, (entry_version, value_bytes)) in versioned_entries.into_iter().enumerate() {
77		// Use AND logic for combined constraints:
78		// - keep_last_versions protects the N most recent versions
79		// - up_to_version only drops versions < threshold IF not protected
80		let should_drop = match (up_to_version, keep_last_versions) {
81			// Both None: drop everything
82			(None, None) => true,
83			// Only version constraint: drop if version < threshold
84			(Some(threshold), None) => entry_version < threshold,
85			// Only keep constraint: drop if beyond keep count
86			(None, Some(keep_count)) => idx >= keep_count,
87			// Both constraints (AND): drop only if BOTH say drop
88			// This ensures keep_last_versions always protects N versions
89			(Some(threshold), Some(keep_count)) => entry_version < threshold && idx >= keep_count,
90		};
91
92		if should_drop {
93			// Never drop the pending version (it's being written in this batch)
94			if Some(entry_version) == pending_version {
95				continue;
96			}
97
98			entries_to_drop.push(DropEntry {
99				key: key_cow.clone(),
100				version: entry_version,
101				value_bytes,
102			});
103		}
104	}
105
106	Ok(entries_to_drop)
107}
108
109#[cfg(test)]
110pub mod tests {
111	use std::collections::HashMap;
112
113	use super::*;
114	use crate::hot::storage::HotStorage;
115
116	/// Create versioned test entries for a key
117	fn setup_versioned_entries(storage: &HotStorage, table: EntryKind, key: &[u8], versions: &[u64]) {
118		for v in versions {
119			let entries = vec![(CowVec::new(key.to_vec()), Some(CowVec::new(vec![*v as u8])))];
120			storage.set(CommitVersion(*v), HashMap::from([(table, entries)])).unwrap();
121		}
122	}
123
124	/// Extract version numbers from the drop entries
125	fn extract_dropped_versions(entries: &[DropEntry]) -> Vec<u64> {
126		entries.iter().map(|e| e.version.0).collect()
127	}
128
129	#[test]
130	fn test_drop_all_versions() {
131		let storage = HotStorage::memory();
132		let table = EntryKind::Multi;
133		let key = b"test_key";
134
135		setup_versioned_entries(&storage, table, key, &[1, 5, 10, 20, 100]);
136
137		let to_drop = find_keys_to_drop(&storage, table, key, None, None, None).unwrap();
138
139		assert_eq!(to_drop.len(), 5);
140		let versions = extract_dropped_versions(&to_drop);
141		assert!(versions.contains(&1));
142		assert!(versions.contains(&5));
143		assert!(versions.contains(&10));
144		assert!(versions.contains(&20));
145		assert!(versions.contains(&100));
146	}
147
148	#[test]
149	fn test_drop_up_to_version() {
150		let storage = HotStorage::memory();
151		let table = EntryKind::Multi;
152		let key = b"test_key";
153
154		// Versions: 1, 5, 10, 20, 100
155		setup_versioned_entries(&storage, table, key, &[1, 5, 10, 20, 100]);
156
157		// Drop versions < 10 (should drop 1, 5)
158		let to_drop = find_keys_to_drop(&storage, table, key, Some(CommitVersion(10)), None, None).unwrap();
159
160		let versions = extract_dropped_versions(&to_drop);
161		assert_eq!(versions.len(), 2);
162		assert!(versions.contains(&1));
163		assert!(versions.contains(&5));
164		assert!(!versions.contains(&10));
165		assert!(!versions.contains(&20));
166		assert!(!versions.contains(&100));
167	}
168
169	#[test]
170	fn test_drop_up_to_version_boundary() {
171		// Test exact boundary - version == threshold should NOT be dropped
172		let storage = HotStorage::memory();
173		let table = EntryKind::Multi;
174		let key = b"test_key";
175
176		setup_versioned_entries(&storage, table, key, &[9, 10, 11]);
177
178		let to_drop = find_keys_to_drop(&storage, table, key, Some(CommitVersion(10)), None, None).unwrap();
179
180		let versions = extract_dropped_versions(&to_drop);
181		assert_eq!(versions.len(), 1);
182		assert!(versions.contains(&9)); // Only 9 < 10
183	}
184
185	#[test]
186	fn test_keep_last_n_versions() {
187		let storage = HotStorage::memory();
188		let table = EntryKind::Multi;
189		let key = b"test_key";
190
191		// Versions: 1, 5, 10, 20, 100 (sorted descending: 100, 20, 10, 5, 1)
192		setup_versioned_entries(&storage, table, key, &[1, 5, 10, 20, 100]);
193
194		// Keep 2 most recent (100, 20), drop others (10, 5, 1)
195		let to_drop = find_keys_to_drop(&storage, table, key, None, Some(2), None).unwrap();
196
197		let versions = extract_dropped_versions(&to_drop);
198		assert_eq!(versions.len(), 3);
199		assert!(versions.contains(&1));
200		assert!(versions.contains(&5));
201		assert!(versions.contains(&10));
202		assert!(!versions.contains(&20));
203		assert!(!versions.contains(&100));
204	}
205
206	#[test]
207	fn test_keep_more_than_exists() {
208		// Keep 10 but only 3 exist - should drop nothing
209		let storage = HotStorage::memory();
210		let table = EntryKind::Multi;
211		let key = b"test_key";
212
213		setup_versioned_entries(&storage, table, key, &[1, 5, 10]);
214
215		let to_drop = find_keys_to_drop(&storage, table, key, None, Some(10), None).unwrap();
216
217		assert!(to_drop.is_empty());
218	}
219
220	#[test]
221	fn test_keep_zero_versions() {
222		// Keep 0 = drop all
223		let storage = HotStorage::memory();
224		let table = EntryKind::Multi;
225		let key = b"test_key";
226
227		setup_versioned_entries(&storage, table, key, &[1, 5, 10]);
228
229		let to_drop = find_keys_to_drop(&storage, table, key, None, Some(0), None).unwrap();
230
231		assert_eq!(to_drop.len(), 3);
232	}
233
234	#[test]
235	fn test_keep_one_version() {
236		let storage = HotStorage::memory();
237		let table = EntryKind::Multi;
238		let key = b"test_key";
239
240		setup_versioned_entries(&storage, table, key, &[1, 5, 10, 20, 100]);
241
242		// Keep only most recent (100)
243		let to_drop = find_keys_to_drop(&storage, table, key, None, Some(1), None).unwrap();
244
245		let versions = extract_dropped_versions(&to_drop);
246		assert_eq!(versions.len(), 4);
247		assert!(!versions.contains(&100)); // Most recent kept
248	}
249
250	#[test]
251	fn test_combined_constraints_keep_protects() {
252		let storage = HotStorage::memory();
253		let table = EntryKind::Multi;
254		let key = b"test_key";
255
256		// Versions: 1, 5, 10, 20, 100 (sorted desc: 100, 20, 10, 5, 1)
257		setup_versioned_entries(&storage, table, key, &[1, 5, 10, 20, 100]);
258
259		// up_to_version=15 would drop: 1, 5, 10 (all < 15)
260		// keep_last_versions=3 protects: 100, 20, 10 (indices 0, 1, 2)
261		// Combined (AND logic): drop only if (version < 15) AND (idx >= 3)
262		// - 100: idx=0, 100 >= 15 → KEEP
263		// - 20: idx=1, 20 >= 15 → KEEP
264		// - 10: idx=2, 10 < 15 BUT idx < 3 → KEEP (protected!)
265		// - 5: idx=3, 5 < 15 AND idx >= 3 → DROP
266		// - 1: idx=4, 1 < 15 AND idx >= 3 → DROP
267		let to_drop = find_keys_to_drop(&storage, table, key, Some(CommitVersion(15)), Some(3), None).unwrap();
268
269		let versions = extract_dropped_versions(&to_drop);
270		assert_eq!(versions.len(), 2); // Only 1 and 5 dropped
271		assert!(versions.contains(&1));
272		assert!(versions.contains(&5));
273		assert!(!versions.contains(&10)); // Protected by keep_last=3
274		assert!(!versions.contains(&20));
275		assert!(!versions.contains(&100));
276	}
277
278	#[test]
279	fn test_combined_constraints_version_restricts() {
280		// Test case where up_to_version is more restrictive than keep_last
281		let storage = HotStorage::memory();
282		let table = EntryKind::Multi;
283		let key = b"test_key";
284
285		// Versions: 1, 5, 10, 20, 100 (sorted desc: 100, 20, 10, 5, 1)
286		setup_versioned_entries(&storage, table, key, &[1, 5, 10, 20, 100]);
287
288		// up_to_version=3 would drop: only 1 (1 < 3)
289		// keep_last_versions=2 protects: 100, 20 (indices 0, 1)
290		// Combined (AND logic): drop only if (version < 3) AND (idx >= 2)
291		// - 100: idx=0 → KEEP (protected)
292		// - 20: idx=1 → KEEP (protected)
293		// - 10: idx=2, 10 >= 3 → KEEP (version constraint not met)
294		// - 5: idx=3, 5 >= 3 → KEEP (version constraint not met)
295		// - 1: idx=4, 1 < 3 AND idx >= 2 → DROP
296		let to_drop = find_keys_to_drop(&storage, table, key, Some(CommitVersion(3)), Some(2), None).unwrap();
297
298		let versions = extract_dropped_versions(&to_drop);
299		assert_eq!(versions.len(), 1); // Only 1 dropped
300		assert!(versions.contains(&1));
301	}
302
303	#[test]
304	fn test_combined_constraints_both_aggressive() {
305		// Both constraints are aggressive
306		let storage = HotStorage::memory();
307		let table = EntryKind::Multi;
308		let key = b"test_key";
309
310		// Versions: 1, 5, 10, 20, 100 (sorted desc: 100, 20, 10, 5, 1)
311		setup_versioned_entries(&storage, table, key, &[1, 5, 10, 20, 100]);
312
313		// up_to_version=50 would drop: 1, 5, 10, 20 (all < 50)
314		// keep_last_versions=1 protects: only 100 (index 0)
315		// Combined (AND logic): drop only if (version < 50) AND (idx >= 1)
316		// - 100: idx=0 → KEEP (protected)
317		// - 20: idx=1, 20 < 50 AND idx >= 1 → DROP
318		// - 10: idx=2, 10 < 50 AND idx >= 1 → DROP
319		// - 5: idx=3, 5 < 50 AND idx >= 1 → DROP
320		// - 1: idx=4, 1 < 50 AND idx >= 1 → DROP
321		let to_drop = find_keys_to_drop(&storage, table, key, Some(CommitVersion(50)), Some(1), None).unwrap();
322
323		let versions = extract_dropped_versions(&to_drop);
324		assert_eq!(versions.len(), 4); // All except 100
325		assert!(versions.contains(&1));
326		assert!(versions.contains(&5));
327		assert!(versions.contains(&10));
328		assert!(versions.contains(&20));
329		assert!(!versions.contains(&100)); // Protected
330	}
331
332	// ==================== Edge cases ====================
333
334	#[test]
335	fn test_empty_storage() {
336		let storage = HotStorage::memory();
337		let table = EntryKind::Multi;
338		let key = b"nonexistent";
339
340		let to_drop = find_keys_to_drop(&storage, table, key, None, None, None).unwrap();
341		assert!(to_drop.is_empty());
342	}
343
344	#[test]
345	fn test_single_version_drop_all() {
346		let storage = HotStorage::memory();
347		let table = EntryKind::Multi;
348		let key = b"test_key";
349
350		setup_versioned_entries(&storage, table, key, &[42]);
351
352		// Drop all
353		let to_drop = find_keys_to_drop(&storage, table, key, None, None, None).unwrap();
354		assert_eq!(to_drop.len(), 1);
355	}
356
357	#[test]
358	fn test_single_version_keep_one() {
359		let storage = HotStorage::memory();
360		let table = EntryKind::Multi;
361		let key = b"test_key";
362
363		setup_versioned_entries(&storage, table, key, &[42]);
364
365		// Keep 1 - should drop nothing
366		let to_drop = find_keys_to_drop(&storage, table, key, None, Some(1), None).unwrap();
367		assert!(to_drop.is_empty());
368	}
369
370	#[test]
371	fn test_different_keys_isolated() {
372		let storage = HotStorage::memory();
373		let table = EntryKind::Multi;
374
375		setup_versioned_entries(&storage, table, b"key_a", &[1, 2, 3]);
376		setup_versioned_entries(&storage, table, b"key_b", &[10, 20, 30]);
377
378		// Drop all versions of key_a
379		let to_drop = find_keys_to_drop(&storage, table, b"key_a", None, None, None).unwrap();
380
381		assert_eq!(to_drop.len(), 3);
382		// Verify all dropped keys are for key_a, not key_b
383		for entry in &to_drop {
384			assert_eq!(entry.key.as_slice(), b"key_a");
385		}
386	}
387
388	#[test]
389	fn test_up_to_version_zero() {
390		// up_to_version=0 means drop nothing (no versions < 0)
391		let storage = HotStorage::memory();
392		let table = EntryKind::Multi;
393		let key = b"test_key";
394
395		setup_versioned_entries(&storage, table, key, &[1, 5, 10]);
396
397		let to_drop = find_keys_to_drop(&storage, table, key, Some(CommitVersion(0)), None, None).unwrap();
398
399		assert!(to_drop.is_empty());
400	}
401
402	#[test]
403	fn test_up_to_version_max() {
404		// up_to_version=MAX means drop all (all versions < MAX)
405		let storage = HotStorage::memory();
406		let table = EntryKind::Multi;
407		let key = b"test_key";
408
409		setup_versioned_entries(&storage, table, key, &[1, 5, u64::MAX - 1]);
410
411		let to_drop =
412			find_keys_to_drop(&storage, table, key, Some(CommitVersion(u64::MAX)), None, None).unwrap();
413
414		assert_eq!(to_drop.len(), 3);
415	}
416}