1use std::collections::HashSet;
58use std::fs::{File, OpenOptions};
59use std::io::{self, BufReader, BufWriter, Read, Write};
60use std::path::{Path, PathBuf};
61use std::sync::Arc;
62use std::sync::atomic::{AtomicU64, Ordering};
63
64use parking_lot::RwLock;
65
66pub type VectorId = u128;
72
73pub type InternalId = u64;
75
76#[derive(Debug, thiserror::Error)]
81pub enum TombstoneError {
82 #[error("I/O error: {0}")]
83 Io(#[from] io::Error),
84
85 #[error("corrupted tombstone file: {0}")]
86 Corrupted(String),
87
88 #[error("tombstone file version mismatch: expected {expected}, got {actual}")]
89 VersionMismatch { expected: u32, actual: u32 },
90}
91
92pub type Result<T> = std::result::Result<T, TombstoneError>;
93
94pub struct TombstoneManager {
102 path: PathBuf,
104
105 deleted: RwLock<HashSet<InternalId>>,
107
108 count: AtomicU64,
110
111 writer: Option<RwLock<BufWriter<File>>>,
113}
114
115#[repr(C, packed)]
117#[derive(Debug, Clone, Copy)]
118struct TombstoneHeader {
119 magic: [u8; 4],
121 version: u32,
123 count: u64,
125 _reserved: [u8; 16],
127}
128
129impl TombstoneHeader {
130 const MAGIC: [u8; 4] = *b"TOMB";
131 const VERSION: u32 = 1;
132 const SIZE: usize = std::mem::size_of::<Self>();
133
134 fn new(count: u64) -> Self {
135 Self {
136 magic: Self::MAGIC,
137 version: Self::VERSION,
138 count,
139 _reserved: [0u8; 16],
140 }
141 }
142
143 fn validate(&self) -> Result<()> {
144 if self.magic != Self::MAGIC {
145 return Err(TombstoneError::Corrupted("invalid magic bytes".to_string()));
146 }
147 if self.version != Self::VERSION {
148 return Err(TombstoneError::VersionMismatch {
149 expected: Self::VERSION,
150 actual: self.version,
151 });
152 }
153 Ok(())
154 }
155
156 fn to_bytes(&self) -> [u8; Self::SIZE] {
157 let mut bytes = [0u8; Self::SIZE];
158 bytes[0..4].copy_from_slice(&self.magic);
159 bytes[4..8].copy_from_slice(&self.version.to_le_bytes());
160 bytes[8..16].copy_from_slice(&self.count.to_le_bytes());
161 bytes
162 }
163
164 fn from_bytes(bytes: &[u8; Self::SIZE]) -> Self {
165 Self {
166 magic: bytes[0..4].try_into().unwrap(),
167 version: u32::from_le_bytes(bytes[4..8].try_into().unwrap()),
168 count: u64::from_le_bytes(bytes[8..16].try_into().unwrap()),
169 _reserved: bytes[16..32].try_into().unwrap(),
170 }
171 }
172}
173
174impl TombstoneManager {
175 pub fn new(path: impl AsRef<Path>, writable: bool) -> Result<Self> {
180 let path = path.as_ref().to_path_buf();
181
182 let (deleted, count, writer) = if path.exists() {
183 let (deleted, count) = Self::load_from_file(&path)?;
185
186 let writer = if writable {
187 let file = OpenOptions::new().append(true).open(&path)?;
188 Some(RwLock::new(BufWriter::new(file)))
189 } else {
190 None
191 };
192
193 (deleted, count, writer)
194 } else if writable {
195 let file = File::create(&path)?;
197 let mut writer = BufWriter::new(file);
198
199 let header = TombstoneHeader::new(0);
201 writer.write_all(&header.to_bytes())?;
202 writer.flush()?;
203
204 drop(writer);
206 let file = OpenOptions::new().append(true).open(&path)?;
207
208 (HashSet::new(), 0, Some(RwLock::new(BufWriter::new(file))))
209 } else {
210 (HashSet::new(), 0, None)
211 };
212
213 Ok(Self {
214 path,
215 deleted: RwLock::new(deleted),
216 count: AtomicU64::new(count),
217 writer,
218 })
219 }
220
221 fn load_from_file(path: &Path) -> Result<(HashSet<InternalId>, u64)> {
223 let file = File::open(path)?;
224 let mut reader = BufReader::new(file);
225
226 let mut header_bytes = [0u8; TombstoneHeader::SIZE];
228 reader.read_exact(&mut header_bytes)?;
229 let header = TombstoneHeader::from_bytes(&header_bytes);
230 header.validate()?;
231
232 let mut deleted = HashSet::with_capacity(header.count as usize);
234 let mut id_bytes = [0u8; 8];
235 let mut count = 0u64;
236
237 while reader.read_exact(&mut id_bytes).is_ok() {
238 let id = u64::from_le_bytes(id_bytes);
239 deleted.insert(id);
240 count += 1;
241 }
242
243 Ok((deleted, count))
244 }
245
246 pub fn delete(&self, id: InternalId) -> Result<bool> {
248 {
250 let deleted = self.deleted.read();
251 if deleted.contains(&id) {
252 return Ok(false); }
254 }
255
256 {
258 let mut deleted = self.deleted.write();
259 if !deleted.insert(id) {
260 return Ok(false); }
262 }
263
264 if let Some(ref writer) = self.writer {
266 let mut writer = writer.write();
267 writer.write_all(&id.to_le_bytes())?;
268 writer.flush()?;
269 }
270
271 self.count.fetch_add(1, Ordering::Relaxed);
272 Ok(true)
273 }
274
275 pub fn delete_batch(&self, ids: &[InternalId]) -> Result<usize> {
277 let mut new_deletions = Vec::new();
278
279 {
281 let mut deleted = self.deleted.write();
282 for &id in ids {
283 if deleted.insert(id) {
284 new_deletions.push(id);
285 }
286 }
287 }
288
289 if new_deletions.is_empty() {
290 return Ok(0);
291 }
292
293 if let Some(ref writer) = self.writer {
295 let mut writer = writer.write();
296 for id in &new_deletions {
297 writer.write_all(&id.to_le_bytes())?;
298 }
299 writer.flush()?;
300 }
301
302 let count = new_deletions.len();
303 self.count.fetch_add(count as u64, Ordering::Relaxed);
304 Ok(count)
305 }
306
307 #[inline]
309 pub fn is_deleted(&self, id: InternalId) -> bool {
310 self.deleted.read().contains(&id)
311 }
312
313 pub fn filter_deleted(&self, ids: &[InternalId]) -> Vec<InternalId> {
315 let deleted = self.deleted.read();
316 ids.iter()
317 .copied()
318 .filter(|id| !deleted.contains(id))
319 .collect()
320 }
321
322 pub fn count(&self) -> u64 {
324 self.count.load(Ordering::Relaxed)
325 }
326
327 pub fn all_deleted(&self) -> Vec<InternalId> {
329 self.deleted.read().iter().copied().collect()
330 }
331
332 pub fn sync(&self) -> Result<()> {
334 if let Some(ref writer) = self.writer {
335 writer.write().flush()?;
336 }
337 Ok(())
338 }
339
340 pub fn compact(&self) -> Result<()> {
342 let deleted: Vec<_> = self.deleted.read().iter().copied().collect();
343
344 let temp_path = self.path.with_extension("tmp");
346 {
347 let file = File::create(&temp_path)?;
348 let mut writer = BufWriter::new(file);
349
350 let header = TombstoneHeader::new(deleted.len() as u64);
352 writer.write_all(&header.to_bytes())?;
353
354 for id in &deleted {
356 writer.write_all(&id.to_le_bytes())?;
357 }
358 writer.flush()?;
359 }
360
361 std::fs::rename(&temp_path, &self.path)?;
363
364 Ok(())
365 }
366}
367
368pub struct TombstoneFilter {
374 manager: Arc<TombstoneManager>,
375 overfetch_factor: f32,
377}
378
379impl TombstoneFilter {
380 pub fn new(manager: Arc<TombstoneManager>) -> Self {
382 Self {
383 manager,
384 overfetch_factor: 1.2, }
386 }
387
388 pub fn with_overfetch(mut self, factor: f32) -> Self {
390 self.overfetch_factor = factor.max(1.0);
391 self
392 }
393
394 pub fn effective_k(&self, k: usize) -> usize {
396 let deletion_rate = self.deletion_rate();
397 if deletion_rate < 0.01 {
398 (k as f32 * 1.05).ceil() as usize
400 } else {
401 let factor = 1.0 / (1.0 - deletion_rate);
403 (k as f32 * factor * self.overfetch_factor).ceil() as usize
404 }
405 }
406
407 fn deletion_rate(&self) -> f32 {
409 0.1
412 }
413
414 pub fn filter<T>(&self, results: Vec<(InternalId, T)>, limit: usize) -> Vec<(InternalId, T)> {
416 results
417 .into_iter()
418 .filter(|(id, _)| !self.manager.is_deleted(*id))
419 .take(limit)
420 .collect()
421 }
422
423 pub fn filter_with_continuation<T>(
425 &self,
426 results: Vec<(InternalId, T)>,
427 limit: usize,
428 ) -> (Vec<(InternalId, T)>, bool) {
429 let filtered: Vec<_> = results
430 .into_iter()
431 .filter(|(id, _)| !self.manager.is_deleted(*id))
432 .take(limit)
433 .collect();
434
435 let need_more = filtered.len() < limit;
436 (filtered, need_more)
437 }
438}
439
440#[cfg(test)]
445mod tests {
446 use super::*;
447 use tempfile::TempDir;
448
449 fn temp_tombstone() -> (TempDir, TombstoneManager) {
450 let temp_dir = TempDir::new().unwrap();
451 let path = temp_dir.path().join("tombstones.bin");
452 let manager = TombstoneManager::new(&path, true).unwrap();
453 (temp_dir, manager)
454 }
455
456 #[test]
457 fn test_delete_single() {
458 let (_temp, manager) = temp_tombstone();
459
460 assert!(!manager.is_deleted(1));
461
462 assert!(manager.delete(1).unwrap());
463 assert!(manager.is_deleted(1));
464
465 assert!(!manager.delete(1).unwrap());
467 }
468
469 #[test]
470 fn test_delete_batch() {
471 let (_temp, manager) = temp_tombstone();
472
473 let count = manager.delete_batch(&[1, 2, 3, 4, 5]).unwrap();
474 assert_eq!(count, 5);
475
476 for id in 1..=5 {
477 assert!(manager.is_deleted(id));
478 }
479 assert!(!manager.is_deleted(6));
480
481 let count = manager.delete_batch(&[4, 5, 6, 7]).unwrap();
483 assert_eq!(count, 2); }
485
486 #[test]
487 fn test_filter_deleted() {
488 let (_temp, manager) = temp_tombstone();
489
490 manager.delete_batch(&[2, 4, 6, 8]).unwrap();
491
492 let ids: Vec<_> = (1..=10).collect();
493 let filtered = manager.filter_deleted(&ids);
494
495 assert_eq!(filtered, vec![1, 3, 5, 7, 9, 10]);
496 }
497
498 #[test]
499 fn test_persistence() {
500 let temp_dir = TempDir::new().unwrap();
501 let path = temp_dir.path().join("tombstones.bin");
502
503 {
505 let manager = TombstoneManager::new(&path, true).unwrap();
506 manager.delete_batch(&[1, 2, 3]).unwrap();
507 manager.sync().unwrap();
508 }
509
510 {
512 let manager = TombstoneManager::new(&path, false).unwrap();
513 assert!(manager.is_deleted(1));
514 assert!(manager.is_deleted(2));
515 assert!(manager.is_deleted(3));
516 assert!(!manager.is_deleted(4));
517 assert_eq!(manager.count(), 3);
518 }
519 }
520
521 #[test]
522 fn test_compact() {
523 let temp_dir = TempDir::new().unwrap();
524 let path = temp_dir.path().join("tombstones.bin");
525
526 {
527 let manager = TombstoneManager::new(&path, true).unwrap();
528
529 for _ in 0..5 {
531 manager.delete_batch(&[1, 2, 3]).unwrap();
532 }
533
534 manager.compact().unwrap();
535 }
536
537 let manager = TombstoneManager::new(&path, false).unwrap();
539 assert_eq!(manager.count(), 3);
540 }
541
542 #[test]
543 fn test_tombstone_filter() {
544 let (_temp, manager) = temp_tombstone();
545 let manager = Arc::new(manager);
546
547 manager.delete_batch(&[2, 4]).unwrap();
549
550 let filter = TombstoneFilter::new(manager);
551
552 let results: Vec<(InternalId, f32)> = vec![
554 (1, 0.9),
555 (2, 0.8), (3, 0.7),
557 (4, 0.6), (5, 0.5),
559 ];
560
561 let filtered = filter.filter(results, 3);
562 assert_eq!(filtered.len(), 3);
563 assert_eq!(filtered[0].0, 1);
564 assert_eq!(filtered[1].0, 3);
565 assert_eq!(filtered[2].0, 5);
566 }
567
568 #[test]
569 fn test_effective_k() {
570 let (_temp, manager) = temp_tombstone();
571 let manager = Arc::new(manager);
572
573 let filter = TombstoneFilter::new(manager);
574
575 let k = filter.effective_k(10);
577 assert!(k > 10);
578 }
579}