Skip to main content

reifydb_store_multi/store/
drop.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::{common::CommitVersion, interface::store::EntryKind};
5use reifydb_type::util::cowvec::CowVec;
6
7use crate::{Result, tier::TierStorage};
8
9/// Information about an entry to be dropped.
10#[derive(Debug, Clone)]
11pub struct DropEntry {
12	/// The logical key to delete
13	pub key: CowVec<u8>,
14	/// The specific version to delete
15	pub version: CommitVersion,
16	/// The size of the value being dropped (for metrics tracking)
17	pub value_bytes: u64,
18}
19
20/// Find historical versioned keys to drop.
21///
22/// Always keeps the most recent version and drops everything else.
23///
24/// # Arguments
25/// - `storage`: The storage backend to scan
26/// - `table`: The table containing the keys
27/// - `key`: The logical key (without version suffix)
28/// - `pending_version`: Version being written in the same batch (to avoid race)
29pub(crate) fn find_keys_to_drop<S: TierStorage>(
30	storage: &S,
31	table: EntryKind,
32	key: &[u8],
33	pending_version: Option<CommitVersion>,
34) -> Result<Vec<DropEntry>> {
35	// Get all versions of this key directly (bypasses MVCC resolution)
36	let all_versions = storage.get_all_versions(table, key)?;
37
38	// Collect all versions with their value sizes
39	let mut versioned_entries: Vec<(CommitVersion, u64)> = all_versions
40		.into_iter()
41		.map(|(version, value)| {
42			let value_bytes = value.as_ref().map(|v| v.len() as u64).unwrap_or(0);
43			(version, value_bytes)
44		})
45		.collect();
46
47	// Include pending version if provided (version being written in current batch)
48	// This prevents a race where Drop scans storage before Set is written
49	if let Some(pending_ver) = pending_version {
50		// Check if pending version already exists (avoid duplicates)
51		if !versioned_entries.iter().any(|(v, _)| *v == pending_ver) {
52			// Add a placeholder entry for the pending version
53			// value_bytes=0 is fine since this entry will never be dropped (it's the newest)
54			versioned_entries.push((pending_ver, 0));
55		}
56	}
57
58	// Sort by version descending (most recent first)
59	versioned_entries.sort_by(|a, b| b.0.cmp(&a.0));
60
61	// Determine which entries to drop: Keep only index 0 (the latest), drop all others
62	let mut entries_to_drop = Vec::new();
63	let key_cow = CowVec::new(key.to_vec());
64
65	for (idx, (entry_version, value_bytes)) in versioned_entries.into_iter().enumerate() {
66		// Aggressive cleanup: Drop everything except the most recent version
67		let should_drop = idx > 0;
68
69		if should_drop {
70			// Never drop the pending version (it's being written in this batch)
71			if Some(entry_version) == pending_version {
72				continue;
73			}
74
75			entries_to_drop.push(DropEntry {
76				key: key_cow.clone(),
77				version: entry_version,
78				value_bytes,
79			});
80		}
81	}
82
83	Ok(entries_to_drop)
84}
85
86#[cfg(test)]
87pub mod tests {
88	use std::collections::HashMap;
89
90	use super::*;
91	use crate::hot::storage::HotStorage;
92
93	/// Create versioned test entries for a key
94	fn setup_versioned_entries(storage: &HotStorage, table: EntryKind, key: &[u8], versions: &[u64]) {
95		for v in versions {
96			let entries = vec![(CowVec::new(key.to_vec()), Some(CowVec::new(vec![*v as u8])))];
97			storage.set(CommitVersion(*v), HashMap::from([(table, entries)])).unwrap();
98		}
99	}
100
101	/// Extract version numbers from the drop entries
102	fn extract_dropped_versions(entries: &[DropEntry]) -> Vec<u64> {
103		entries.iter().map(|e| e.version.0).collect()
104	}
105
106	#[test]
107	fn test_drop_historical_versions() {
108		let storage = HotStorage::memory();
109		let table = EntryKind::Multi;
110		let key = b"test_key";
111
112		// Versions: 1, 5, 10, 20, 100
113		setup_versioned_entries(&storage, table, key, &[1, 5, 10, 20, 100]);
114
115		// Should drop all except 100
116		let to_drop = find_keys_to_drop(&storage, table, key, None).unwrap();
117
118		assert_eq!(to_drop.len(), 4);
119		let versions = extract_dropped_versions(&to_drop);
120		assert!(versions.contains(&1));
121		assert!(versions.contains(&5));
122		assert!(versions.contains(&10));
123		assert!(versions.contains(&20));
124		assert!(!versions.contains(&100));
125	}
126
127	#[test]
128	fn test_keep_latest_with_pending() {
129		let storage = HotStorage::memory();
130		let table = EntryKind::Multi;
131		let key = b"test_key";
132
133		// Existing: 1, 5, 10. Pending: 20.
134		setup_versioned_entries(&storage, table, key, &[1, 5, 10]);
135
136		// Should keep 20 (pending) and drop 1, 5, 10
137		let to_drop = find_keys_to_drop(&storage, table, key, Some(CommitVersion(20))).unwrap();
138
139		assert_eq!(to_drop.len(), 3);
140		let versions = extract_dropped_versions(&to_drop);
141		assert!(versions.contains(&1));
142		assert!(versions.contains(&5));
143		assert!(versions.contains(&10));
144		assert!(!versions.contains(&20));
145	}
146
147	#[test]
148	fn test_single_version_no_drop() {
149		let storage = HotStorage::memory();
150		let table = EntryKind::Multi;
151		let key = b"test_key";
152
153		setup_versioned_entries(&storage, table, key, &[42]);
154
155		// Only one version exists, should drop nothing
156		let to_drop = find_keys_to_drop(&storage, table, key, None).unwrap();
157		assert!(to_drop.is_empty());
158	}
159
160	#[test]
161	fn test_empty_storage() {
162		let storage = HotStorage::memory();
163		let table = EntryKind::Multi;
164		let key = b"nonexistent";
165
166		let to_drop = find_keys_to_drop(&storage, table, key, None).unwrap();
167		assert!(to_drop.is_empty());
168	}
169}