Skip to main content

sochdb_vector/
tombstones.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Tombstones for Logical Deletion (Task 6)
19//!
20//! This module implements logical deletion via tombstones for vector indexes.
21//! Deleted vectors remain in the index but are filtered out during retrieval.
22//!
23//! ## Design
24//!
25//! ```text
26//! ┌─────────────────────────────────────────────────────────────────┐
27//! │                     TombstoneManager                             │
28//! ├─────────────────────────────────────────────────────────────────┤
29//! │  In-Memory:                                                      │
30//! │  ┌──────────────────────────────────────────────────────────┐   │
31//! │  │  Bitmap / HashSet for O(1) deletion checks                │   │
32//! │  │  RoaringBitmap for space efficiency                       │   │
33//! │  └──────────────────────────────────────────────────────────┘   │
34//! │                                                                  │
35//! │  On-Disk:                                                        │
36//! │  ┌──────────────────────────────────────────────────────────┐   │
37//! │  │  tombstones.bin: Append-only log of deleted IDs           │   │
38//! │  │  Format: [header][entry][entry]...                        │   │
39//! │  └──────────────────────────────────────────────────────────┘   │
40//! └─────────────────────────────────────────────────────────────────┘
41//! ```
42//!
43//! ## Filtering During Retrieval
44//!
45//! During vector search:
46//! 1. Get top-K candidates from HNSW
47//! 2. Filter out deleted IDs (O(1) per candidate)
48//! 3. Fetch more candidates if needed (over-fetch strategy)
49//!
50//! ## Future: Compaction
51//!
52//! Tombstones can be compacted during index rebuilds:
53//! 1. Build new index without deleted vectors
54//! 2. Swap indexes atomically
55//! 3. Discard tombstone file
56
57use std::collections::HashSet;
58use std::fs::{File, OpenOptions};
59use std::io::{self, BufReader, BufWriter, Read, Write};
60use std::path::{Path, PathBuf};
61use std::sync::Arc;
62use std::sync::atomic::{AtomicU64, Ordering};
63
64use parking_lot::RwLock;
65
66// ============================================================================
67// Tombstone ID Types
68// ============================================================================
69
70/// Vector ID type (matches the storage layer)
71pub type VectorId = u128;
72
73/// Internal ID type for compact storage
74pub type InternalId = u64;
75
76// ============================================================================
77// Tombstone Error Types
78// ============================================================================
79
80#[derive(Debug, thiserror::Error)]
81pub enum TombstoneError {
82    #[error("I/O error: {0}")]
83    Io(#[from] io::Error),
84
85    #[error("corrupted tombstone file: {0}")]
86    Corrupted(String),
87
88    #[error("tombstone file version mismatch: expected {expected}, got {actual}")]
89    VersionMismatch { expected: u32, actual: u32 },
90}
91
92pub type Result<T> = std::result::Result<T, TombstoneError>;
93
94// ============================================================================
95// Tombstone Manager
96// ============================================================================
97
98/// Manager for tombstone-based logical deletion
99///
100/// Provides O(1) deletion checks during vector retrieval.
101pub struct TombstoneManager {
102    /// Path to tombstone file
103    path: PathBuf,
104
105    /// In-memory set of deleted IDs
106    deleted: RwLock<HashSet<InternalId>>,
107
108    /// Count of deleted IDs
109    count: AtomicU64,
110
111    /// File handle for appending (if writable)
112    writer: Option<RwLock<BufWriter<File>>>,
113}
114
115/// Tombstone file header
116#[repr(C, packed)]
117#[derive(Debug, Clone, Copy)]
118struct TombstoneHeader {
119    /// Magic bytes: "TOMB"
120    magic: [u8; 4],
121    /// Version
122    version: u32,
123    /// Number of entries
124    count: u64,
125    /// Reserved for future use
126    _reserved: [u8; 16],
127}
128
129impl TombstoneHeader {
130    const MAGIC: [u8; 4] = *b"TOMB";
131    const VERSION: u32 = 1;
132    const SIZE: usize = std::mem::size_of::<Self>();
133
134    fn new(count: u64) -> Self {
135        Self {
136            magic: Self::MAGIC,
137            version: Self::VERSION,
138            count,
139            _reserved: [0u8; 16],
140        }
141    }
142
143    fn validate(&self) -> Result<()> {
144        if self.magic != Self::MAGIC {
145            return Err(TombstoneError::Corrupted("invalid magic bytes".to_string()));
146        }
147        if self.version != Self::VERSION {
148            return Err(TombstoneError::VersionMismatch {
149                expected: Self::VERSION,
150                actual: self.version,
151            });
152        }
153        Ok(())
154    }
155
156    fn to_bytes(&self) -> [u8; Self::SIZE] {
157        let mut bytes = [0u8; Self::SIZE];
158        bytes[0..4].copy_from_slice(&self.magic);
159        bytes[4..8].copy_from_slice(&self.version.to_le_bytes());
160        bytes[8..16].copy_from_slice(&self.count.to_le_bytes());
161        bytes
162    }
163
164    fn from_bytes(bytes: &[u8; Self::SIZE]) -> Self {
165        Self {
166            magic: bytes[0..4].try_into().unwrap(),
167            version: u32::from_le_bytes(bytes[4..8].try_into().unwrap()),
168            count: u64::from_le_bytes(bytes[8..16].try_into().unwrap()),
169            _reserved: bytes[16..32].try_into().unwrap(),
170        }
171    }
172}
173
174impl TombstoneManager {
175    /// Create a new tombstone manager
176    ///
177    /// If the file exists, loads existing tombstones.
178    /// If not, creates a new empty tombstone file.
179    pub fn new(path: impl AsRef<Path>, writable: bool) -> Result<Self> {
180        let path = path.as_ref().to_path_buf();
181
182        let (deleted, count, writer) = if path.exists() {
183            // Load existing tombstones
184            let (deleted, count) = Self::load_from_file(&path)?;
185
186            let writer = if writable {
187                let file = OpenOptions::new().append(true).open(&path)?;
188                Some(RwLock::new(BufWriter::new(file)))
189            } else {
190                None
191            };
192
193            (deleted, count, writer)
194        } else if writable {
195            // Create new file
196            let file = File::create(&path)?;
197            let mut writer = BufWriter::new(file);
198
199            // Write header
200            let header = TombstoneHeader::new(0);
201            writer.write_all(&header.to_bytes())?;
202            writer.flush()?;
203
204            // Reopen for append
205            drop(writer);
206            let file = OpenOptions::new().append(true).open(&path)?;
207
208            (HashSet::new(), 0, Some(RwLock::new(BufWriter::new(file))))
209        } else {
210            (HashSet::new(), 0, None)
211        };
212
213        Ok(Self {
214            path,
215            deleted: RwLock::new(deleted),
216            count: AtomicU64::new(count),
217            writer,
218        })
219    }
220
221    /// Load tombstones from file
222    fn load_from_file(path: &Path) -> Result<(HashSet<InternalId>, u64)> {
223        let file = File::open(path)?;
224        let mut reader = BufReader::new(file);
225
226        // Read header
227        let mut header_bytes = [0u8; TombstoneHeader::SIZE];
228        reader.read_exact(&mut header_bytes)?;
229        let header = TombstoneHeader::from_bytes(&header_bytes);
230        header.validate()?;
231
232        // Read entries
233        let mut deleted = HashSet::with_capacity(header.count as usize);
234        let mut id_bytes = [0u8; 8];
235        let mut count = 0u64;
236
237        while reader.read_exact(&mut id_bytes).is_ok() {
238            let id = u64::from_le_bytes(id_bytes);
239            deleted.insert(id);
240            count += 1;
241        }
242
243        Ok((deleted, count))
244    }
245
246    /// Mark an ID as deleted
247    pub fn delete(&self, id: InternalId) -> Result<bool> {
248        // Check if already deleted
249        {
250            let deleted = self.deleted.read();
251            if deleted.contains(&id) {
252                return Ok(false); // Already deleted
253            }
254        }
255
256        // Add to in-memory set
257        {
258            let mut deleted = self.deleted.write();
259            if !deleted.insert(id) {
260                return Ok(false); // Already deleted
261            }
262        }
263
264        // Append to file
265        if let Some(ref writer) = self.writer {
266            let mut writer = writer.write();
267            writer.write_all(&id.to_le_bytes())?;
268            writer.flush()?;
269        }
270
271        self.count.fetch_add(1, Ordering::Relaxed);
272        Ok(true)
273    }
274
275    /// Mark multiple IDs as deleted
276    pub fn delete_batch(&self, ids: &[InternalId]) -> Result<usize> {
277        let mut new_deletions = Vec::new();
278
279        // Add to in-memory set
280        {
281            let mut deleted = self.deleted.write();
282            for &id in ids {
283                if deleted.insert(id) {
284                    new_deletions.push(id);
285                }
286            }
287        }
288
289        if new_deletions.is_empty() {
290            return Ok(0);
291        }
292
293        // Append to file
294        if let Some(ref writer) = self.writer {
295            let mut writer = writer.write();
296            for id in &new_deletions {
297                writer.write_all(&id.to_le_bytes())?;
298            }
299            writer.flush()?;
300        }
301
302        let count = new_deletions.len();
303        self.count.fetch_add(count as u64, Ordering::Relaxed);
304        Ok(count)
305    }
306
307    /// Check if an ID is deleted
308    #[inline]
309    pub fn is_deleted(&self, id: InternalId) -> bool {
310        self.deleted.read().contains(&id)
311    }
312
313    /// Check multiple IDs for deletion
314    pub fn filter_deleted(&self, ids: &[InternalId]) -> Vec<InternalId> {
315        let deleted = self.deleted.read();
316        ids.iter()
317            .copied()
318            .filter(|id| !deleted.contains(id))
319            .collect()
320    }
321
322    /// Get the count of deleted IDs
323    pub fn count(&self) -> u64 {
324        self.count.load(Ordering::Relaxed)
325    }
326
327    /// Get all deleted IDs (for compaction)
328    pub fn all_deleted(&self) -> Vec<InternalId> {
329        self.deleted.read().iter().copied().collect()
330    }
331
332    /// Sync to disk
333    pub fn sync(&self) -> Result<()> {
334        if let Some(ref writer) = self.writer {
335            writer.write().flush()?;
336        }
337        Ok(())
338    }
339
340    /// Compact the tombstone file (rewrites without duplicates)
341    pub fn compact(&self) -> Result<()> {
342        let deleted: Vec<_> = self.deleted.read().iter().copied().collect();
343
344        // Write to temporary file
345        let temp_path = self.path.with_extension("tmp");
346        {
347            let file = File::create(&temp_path)?;
348            let mut writer = BufWriter::new(file);
349
350            // Write header
351            let header = TombstoneHeader::new(deleted.len() as u64);
352            writer.write_all(&header.to_bytes())?;
353
354            // Write entries
355            for id in &deleted {
356                writer.write_all(&id.to_le_bytes())?;
357            }
358            writer.flush()?;
359        }
360
361        // Atomic rename
362        std::fs::rename(&temp_path, &self.path)?;
363
364        Ok(())
365    }
366}
367
368// ============================================================================
369// Tombstone Filter for Vector Search
370// ============================================================================
371
372/// Filter for vector search results that excludes deleted IDs
373pub struct TombstoneFilter {
374    manager: Arc<TombstoneManager>,
375    /// Over-fetch factor: fetch this many extra candidates to account for deletions
376    overfetch_factor: f32,
377}
378
379impl TombstoneFilter {
380    /// Create a new tombstone filter
381    pub fn new(manager: Arc<TombstoneManager>) -> Self {
382        Self {
383            manager,
384            overfetch_factor: 1.2, // 20% over-fetch by default
385        }
386    }
387
388    /// Set the over-fetch factor
389    pub fn with_overfetch(mut self, factor: f32) -> Self {
390        self.overfetch_factor = factor.max(1.0);
391        self
392    }
393
394    /// Calculate how many candidates to fetch given the target K
395    pub fn effective_k(&self, k: usize) -> usize {
396        let deletion_rate = self.deletion_rate();
397        if deletion_rate < 0.01 {
398            // Very few deletions, minimal over-fetch
399            (k as f32 * 1.05).ceil() as usize
400        } else {
401            // Estimate how many extra we need
402            let factor = 1.0 / (1.0 - deletion_rate);
403            (k as f32 * factor * self.overfetch_factor).ceil() as usize
404        }
405    }
406
407    /// Get the current deletion rate (for adaptive over-fetch)
408    fn deletion_rate(&self) -> f32 {
409        // This would ideally use total_vectors / deleted_count
410        // For now, use a conservative estimate
411        0.1
412    }
413
414    /// Filter search results, removing deleted IDs
415    pub fn filter<T>(&self, results: Vec<(InternalId, T)>, limit: usize) -> Vec<(InternalId, T)> {
416        results
417            .into_iter()
418            .filter(|(id, _)| !self.manager.is_deleted(*id))
419            .take(limit)
420            .collect()
421    }
422
423    /// Filter and check if we need more candidates
424    pub fn filter_with_continuation<T>(
425        &self,
426        results: Vec<(InternalId, T)>,
427        limit: usize,
428    ) -> (Vec<(InternalId, T)>, bool) {
429        let filtered: Vec<_> = results
430            .into_iter()
431            .filter(|(id, _)| !self.manager.is_deleted(*id))
432            .take(limit)
433            .collect();
434
435        let need_more = filtered.len() < limit;
436        (filtered, need_more)
437    }
438}
439
440// ============================================================================
441// Tests
442// ============================================================================
443
444#[cfg(test)]
445mod tests {
446    use super::*;
447    use tempfile::TempDir;
448
449    fn temp_tombstone() -> (TempDir, TombstoneManager) {
450        let temp_dir = TempDir::new().unwrap();
451        let path = temp_dir.path().join("tombstones.bin");
452        let manager = TombstoneManager::new(&path, true).unwrap();
453        (temp_dir, manager)
454    }
455
456    #[test]
457    fn test_delete_single() {
458        let (_temp, manager) = temp_tombstone();
459
460        assert!(!manager.is_deleted(1));
461
462        assert!(manager.delete(1).unwrap());
463        assert!(manager.is_deleted(1));
464
465        // Double delete returns false
466        assert!(!manager.delete(1).unwrap());
467    }
468
469    #[test]
470    fn test_delete_batch() {
471        let (_temp, manager) = temp_tombstone();
472
473        let count = manager.delete_batch(&[1, 2, 3, 4, 5]).unwrap();
474        assert_eq!(count, 5);
475
476        for id in 1..=5 {
477            assert!(manager.is_deleted(id));
478        }
479        assert!(!manager.is_deleted(6));
480
481        // Partial overlap
482        let count = manager.delete_batch(&[4, 5, 6, 7]).unwrap();
483        assert_eq!(count, 2); // Only 6 and 7 are new
484    }
485
486    #[test]
487    fn test_filter_deleted() {
488        let (_temp, manager) = temp_tombstone();
489
490        manager.delete_batch(&[2, 4, 6, 8]).unwrap();
491
492        let ids: Vec<_> = (1..=10).collect();
493        let filtered = manager.filter_deleted(&ids);
494
495        assert_eq!(filtered, vec![1, 3, 5, 7, 9, 10]);
496    }
497
498    #[test]
499    fn test_persistence() {
500        let temp_dir = TempDir::new().unwrap();
501        let path = temp_dir.path().join("tombstones.bin");
502
503        // Create and delete some IDs
504        {
505            let manager = TombstoneManager::new(&path, true).unwrap();
506            manager.delete_batch(&[1, 2, 3]).unwrap();
507            manager.sync().unwrap();
508        }
509
510        // Reload and verify
511        {
512            let manager = TombstoneManager::new(&path, false).unwrap();
513            assert!(manager.is_deleted(1));
514            assert!(manager.is_deleted(2));
515            assert!(manager.is_deleted(3));
516            assert!(!manager.is_deleted(4));
517            assert_eq!(manager.count(), 3);
518        }
519    }
520
521    #[test]
522    fn test_compact() {
523        let temp_dir = TempDir::new().unwrap();
524        let path = temp_dir.path().join("tombstones.bin");
525
526        {
527            let manager = TombstoneManager::new(&path, true).unwrap();
528
529            // Delete same IDs multiple times (simulating append-only)
530            for _ in 0..5 {
531                manager.delete_batch(&[1, 2, 3]).unwrap();
532            }
533
534            manager.compact().unwrap();
535        }
536
537        // Verify compacted file
538        let manager = TombstoneManager::new(&path, false).unwrap();
539        assert_eq!(manager.count(), 3);
540    }
541
542    #[test]
543    fn test_tombstone_filter() {
544        let (_temp, manager) = temp_tombstone();
545        let manager = Arc::new(manager);
546
547        // Delete IDs 2 and 4
548        manager.delete_batch(&[2, 4]).unwrap();
549
550        let filter = TombstoneFilter::new(manager);
551
552        // Search results with deleted items
553        let results: Vec<(InternalId, f32)> = vec![
554            (1, 0.9),
555            (2, 0.8), // Deleted
556            (3, 0.7),
557            (4, 0.6), // Deleted
558            (5, 0.5),
559        ];
560
561        let filtered = filter.filter(results, 3);
562        assert_eq!(filtered.len(), 3);
563        assert_eq!(filtered[0].0, 1);
564        assert_eq!(filtered[1].0, 3);
565        assert_eq!(filtered[2].0, 5);
566    }
567
568    #[test]
569    fn test_effective_k() {
570        let (_temp, manager) = temp_tombstone();
571        let manager = Arc::new(manager);
572
573        let filter = TombstoneFilter::new(manager);
574
575        // With default settings, should over-fetch
576        let k = filter.effective_k(10);
577        assert!(k > 10);
578    }
579}