1use crate::{DiskANN, DiskAnnError, DiskAnnParams};
55use anndists::prelude::Distance;
56use rayon::prelude::*;
57use std::collections::{BinaryHeap, HashSet};
58use std::cmp::{Ordering, Reverse};
59use std::sync::RwLock;
60
61#[derive(Clone, Copy, Debug)]
63pub struct IncrementalConfig {
64 pub delta_threshold: usize,
66 pub tombstone_ratio_threshold: f32,
68 pub delta_params: DiskAnnParams,
70}
71
72impl Default for IncrementalConfig {
73 fn default() -> Self {
74 Self {
75 delta_threshold: 10_000,
76 tombstone_ratio_threshold: 0.1,
77 delta_params: DiskAnnParams {
78 max_degree: 32, build_beam_width: 64,
80 alpha: 1.2,
81 },
82 }
83 }
84}
85
86#[derive(Clone, Copy)]
88struct Candidate {
89 dist: f32,
90 id: u64, }
92
93impl PartialEq for Candidate {
94 fn eq(&self, other: &Self) -> bool {
95 self.dist == other.dist && self.id == other.id
96 }
97}
98impl Eq for Candidate {}
99impl PartialOrd for Candidate {
100 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
101 self.dist.partial_cmp(&other.dist)
102 }
103}
104impl Ord for Candidate {
105 fn cmp(&self, other: &Self) -> Ordering {
106 self.partial_cmp(other).unwrap_or(Ordering::Equal)
107 }
108}
109
110struct DeltaLayer {
112 vectors: Vec<Vec<f32>>,
114 graph: Vec<Vec<u32>>,
117 entry_point: Option<u32>,
119 max_degree: usize,
121}
122
123impl DeltaLayer {
124 fn new(max_degree: usize) -> Self {
125 Self {
126 vectors: Vec::new(),
127 graph: Vec::new(),
128 entry_point: None,
129 max_degree,
130 }
131 }
132
133 fn len(&self) -> usize {
134 self.vectors.len()
135 }
136
137 fn is_empty(&self) -> bool {
138 self.vectors.is_empty()
139 }
140
141 fn add_vectors<D: Distance<f32> + Copy + Sync>(
143 &mut self,
144 vectors: &[Vec<f32>],
145 dist: D,
146 ) -> Vec<u64> {
147 let start_idx = self.vectors.len();
148 let mut new_ids = Vec::with_capacity(vectors.len());
149
150 for (i, v) in vectors.iter().enumerate() {
151 let local_idx = start_idx + i;
152 let global_id = DELTA_ID_OFFSET + local_idx as u64;
154 new_ids.push(global_id);
155
156 self.vectors.push(v.clone());
157 self.graph.push(Vec::new());
158
159 if local_idx > 0 {
161 let neighbors = self.find_and_prune_neighbors(local_idx, dist);
162 self.graph[local_idx] = neighbors.clone();
163
164 for &nb in &neighbors {
166 let nb_idx = nb as usize;
167 if !self.graph[nb_idx].contains(&(local_idx as u32))
168 && self.graph[nb_idx].len() < self.max_degree
169 {
170 self.graph[nb_idx].push(local_idx as u32);
171 }
172 }
173 }
174
175 if self.entry_point.is_none() {
177 self.entry_point = Some(0);
178 }
179 }
180
181 if self.vectors.len() > 1 {
183 self.entry_point = Some(self.compute_medoid(dist));
184 }
185
186 new_ids
187 }
188
189 fn compute_medoid<D: Distance<f32> + Copy + Sync>(&self, dist: D) -> u32 {
190 if self.vectors.is_empty() {
191 return 0;
192 }
193
194 let dim = self.vectors[0].len();
196 let mut centroid = vec![0.0f32; dim];
197 for v in &self.vectors {
198 for (i, &val) in v.iter().enumerate() {
199 centroid[i] += val;
200 }
201 }
202 for val in &mut centroid {
203 *val /= self.vectors.len() as f32;
204 }
205
206 let (best_idx, _) = self.vectors
208 .iter()
209 .enumerate()
210 .map(|(idx, v)| (idx, dist.eval(¢roid, v)))
211 .min_by(|a, b| a.1.partial_cmp(&b.1).unwrap())
212 .unwrap_or((0, f32::MAX));
213
214 best_idx as u32
215 }
216
217 fn find_and_prune_neighbors<D: Distance<f32> + Copy>(
218 &self,
219 node_idx: usize,
220 dist: D,
221 ) -> Vec<u32> {
222 let query = &self.vectors[node_idx];
223 let beam_width = (self.max_degree * 2).max(16);
224
225 let candidates = if let Some(entry) = self.entry_point {
227 self.greedy_search_internal(query, entry as usize, beam_width, dist)
228 } else {
229 self.vectors.iter()
231 .enumerate()
232 .filter(|(i, _)| *i != node_idx)
233 .map(|(i, v)| (i as u32, dist.eval(query, v)))
234 .collect()
235 };
236
237 self.prune_neighbors(node_idx, &candidates, dist)
239 }
240
241 fn greedy_search_internal<D: Distance<f32> + Copy>(
242 &self,
243 query: &[f32],
244 start: usize,
245 beam_width: usize,
246 dist: D,
247 ) -> Vec<(u32, f32)> {
248 if self.vectors.is_empty() || start >= self.vectors.len() {
249 return Vec::new();
250 }
251
252 let mut visited = HashSet::new();
253 let mut frontier: BinaryHeap<Reverse<Candidate>> = BinaryHeap::new();
254 let mut results: BinaryHeap<Candidate> = BinaryHeap::new();
255
256 let start_dist = dist.eval(query, &self.vectors[start]);
257 let start_cand = Candidate { dist: start_dist, id: start as u64 };
258 frontier.push(Reverse(start_cand));
259 results.push(start_cand);
260 visited.insert(start);
261
262 while let Some(Reverse(best)) = frontier.peek().copied() {
263 if results.len() >= beam_width {
264 if let Some(worst) = results.peek() {
265 if best.dist >= worst.dist {
266 break;
267 }
268 }
269 }
270 let Reverse(current) = frontier.pop().unwrap();
271 let cur_idx = current.id as usize;
272
273 if cur_idx < self.graph.len() {
274 for &nb in &self.graph[cur_idx] {
275 let nb_idx = nb as usize;
276 if !visited.insert(nb_idx) {
277 continue;
278 }
279 if nb_idx >= self.vectors.len() {
280 continue;
281 }
282
283 let d = dist.eval(query, &self.vectors[nb_idx]);
284 let cand = Candidate { dist: d, id: nb as u64 };
285
286 if results.len() < beam_width {
287 results.push(cand);
288 frontier.push(Reverse(cand));
289 } else if d < results.peek().unwrap().dist {
290 results.pop();
291 results.push(cand);
292 frontier.push(Reverse(cand));
293 }
294 }
295 }
296 }
297
298 results.into_vec()
299 .into_iter()
300 .map(|c| (c.id as u32, c.dist))
301 .collect()
302 }
303
304 fn prune_neighbors<D: Distance<f32> + Copy>(
305 &self,
306 node_idx: usize,
307 candidates: &[(u32, f32)],
308 dist: D,
309 ) -> Vec<u32> {
310 if candidates.is_empty() {
311 return Vec::new();
312 }
313
314 let alpha = 1.2f32;
315 let mut sorted = candidates.to_vec();
316 sorted.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap());
317
318 let mut pruned = Vec::new();
319
320 for &(cand_id, cand_dist) in &sorted {
321 if cand_id as usize == node_idx {
322 continue;
323 }
324
325 let mut ok = true;
326 for &sel in &pruned {
327 let d = dist.eval(
328 &self.vectors[cand_id as usize],
329 &self.vectors[sel as usize],
330 );
331 if d < alpha * cand_dist {
332 ok = false;
333 break;
334 }
335 }
336
337 if ok {
338 pruned.push(cand_id);
339 if pruned.len() >= self.max_degree {
340 break;
341 }
342 }
343 }
344
345 pruned
346 }
347
348 fn search<D: Distance<f32> + Copy>(
349 &self,
350 query: &[f32],
351 k: usize,
352 beam_width: usize,
353 dist: D,
354 ) -> Vec<(u64, f32)> {
355 if self.vectors.is_empty() {
356 return Vec::new();
357 }
358
359 let entry = self.entry_point.unwrap_or(0) as usize;
360 let mut results = self.greedy_search_internal(query, entry, beam_width, dist);
361 results.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap());
362 results.truncate(k);
363
364 results.into_iter()
366 .map(|(local_id, d)| (DELTA_ID_OFFSET + local_id as u64, d))
367 .collect()
368 }
369
370 fn get_vector(&self, local_idx: usize) -> Option<&Vec<f32>> {
371 self.vectors.get(local_idx)
372 }
373}
374
375const DELTA_ID_OFFSET: u64 = 1u64 << 48;
377
378#[inline]
380pub fn is_delta_id(id: u64) -> bool {
381 id >= DELTA_ID_OFFSET
382}
383
384#[inline]
386pub fn delta_local_idx(id: u64) -> usize {
387 (id - DELTA_ID_OFFSET) as usize
388}
389
390pub struct IncrementalDiskANN<D>
392where
393 D: Distance<f32> + Send + Sync + Copy + Clone + 'static,
394{
395 base: Option<DiskANN<D>>,
397 delta: RwLock<DeltaLayer>,
399 tombstones: RwLock<HashSet<u64>>,
401 dist: D,
403 config: IncrementalConfig,
405 base_path: Option<String>,
407 dim: usize,
409}
410
411impl<D> IncrementalDiskANN<D>
412where
413 D: Distance<f32> + Send + Sync + Copy + Clone + Default + 'static,
414{
415 pub fn build_default(
417 vectors: &[Vec<f32>],
418 file_path: &str,
419 ) -> Result<Self, DiskAnnError> {
420 Self::build_with_config(vectors, file_path, IncrementalConfig::default())
421 }
422
423 pub fn open(path: &str) -> Result<Self, DiskAnnError> {
425 Self::open_with_config(path, IncrementalConfig::default())
426 }
427}
428
429impl<D> IncrementalDiskANN<D>
430where
431 D: Distance<f32> + Send + Sync + Copy + Clone + 'static,
432{
433 pub fn build_with_config(
435 vectors: &[Vec<f32>],
436 file_path: &str,
437 config: IncrementalConfig,
438 ) -> Result<Self, DiskAnnError>
439 where
440 D: Default,
441 {
442 let dist = D::default();
443 let dim = vectors.first().map(|v| v.len()).unwrap_or(0);
444
445 let base = DiskANN::<D>::build_index_default(vectors, dist, file_path)?;
446
447 Ok(Self {
448 base: Some(base),
449 delta: RwLock::new(DeltaLayer::new(config.delta_params.max_degree)),
450 tombstones: RwLock::new(HashSet::new()),
451 dist,
452 config,
453 base_path: Some(file_path.to_string()),
454 dim,
455 })
456 }
457
458 pub fn open_with_config(path: &str, config: IncrementalConfig) -> Result<Self, DiskAnnError>
460 where
461 D: Default,
462 {
463 let dist = D::default();
464 let base = DiskANN::<D>::open_index_default_metric(path)?;
465 let dim = base.dim;
466
467 Ok(Self {
468 base: Some(base),
469 delta: RwLock::new(DeltaLayer::new(config.delta_params.max_degree)),
470 tombstones: RwLock::new(HashSet::new()),
471 dist,
472 config,
473 base_path: Some(path.to_string()),
474 dim,
475 })
476 }
477
478 pub fn new_empty(dim: usize, dist: D, config: IncrementalConfig) -> Self {
480 Self {
481 base: None,
482 delta: RwLock::new(DeltaLayer::new(config.delta_params.max_degree)),
483 tombstones: RwLock::new(HashSet::new()),
484 dist,
485 config,
486 base_path: None,
487 dim,
488 }
489 }
490
491 pub fn add_vectors(&self, vectors: &[Vec<f32>]) -> Result<Vec<u64>, DiskAnnError> {
493 if vectors.is_empty() {
494 return Ok(Vec::new());
495 }
496
497 for (i, v) in vectors.iter().enumerate() {
499 if v.len() != self.dim {
500 return Err(DiskAnnError::IndexError(format!(
501 "Vector {} has dimension {} but index expects {}",
502 i, v.len(), self.dim
503 )));
504 }
505 }
506
507 let mut delta = self.delta.write().unwrap();
508 let ids = delta.add_vectors(vectors, self.dist);
509 Ok(ids)
510 }
511
512 pub fn delete_vectors(&self, ids: &[u64]) -> Result<(), DiskAnnError> {
514 let mut tombstones = self.tombstones.write().unwrap();
515 for &id in ids {
516 tombstones.insert(id);
517 }
518 Ok(())
519 }
520
521 pub fn is_deleted(&self, id: u64) -> bool {
523 self.tombstones.read().unwrap().contains(&id)
524 }
525
526 pub fn search(&self, query: &[f32], k: usize, beam_width: usize) -> Vec<u64> {
528 self.search_with_dists(query, k, beam_width)
529 .into_iter()
530 .map(|(id, _)| id)
531 .collect()
532 }
533
534 pub fn search_with_dists(&self, query: &[f32], k: usize, beam_width: usize) -> Vec<(u64, f32)> {
536 let tombstones = self.tombstones.read().unwrap();
537 let delta = self.delta.read().unwrap();
538
539 let mut all_candidates: Vec<(u64, f32)> = Vec::with_capacity(k * 2);
541
542 if let Some(ref base) = self.base {
544 let base_results = base.search_with_dists(query, k + tombstones.len(), beam_width);
545 for (id, dist) in base_results {
546 let global_id = id as u64;
547 if !tombstones.contains(&global_id) {
548 all_candidates.push((global_id, dist));
549 }
550 }
551 }
552
553 if !delta.is_empty() {
555 let delta_results = delta.search(query, k, beam_width, self.dist);
556 for (id, dist) in delta_results {
557 if !tombstones.contains(&id) {
558 all_candidates.push((id, dist));
559 }
560 }
561 }
562
563 all_candidates.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap());
565 all_candidates.truncate(k);
566 all_candidates
567 }
568
569 pub fn search_batch(
571 &self,
572 queries: &[Vec<f32>],
573 k: usize,
574 beam_width: usize,
575 ) -> Vec<Vec<u64>> {
576 queries
577 .par_iter()
578 .map(|q| self.search(q, k, beam_width))
579 .collect()
580 }
581
582 pub fn get_vector(&self, id: u64) -> Option<Vec<f32>> {
584 if is_delta_id(id) {
585 let delta = self.delta.read().unwrap();
586 delta.get_vector(delta_local_idx(id)).cloned()
587 } else if let Some(ref base) = self.base {
588 let idx = id as usize;
589 if idx < base.num_vectors {
590 Some(base.get_vector(idx))
591 } else {
592 None
593 }
594 } else {
595 None
596 }
597 }
598
599 pub fn should_compact(&self) -> bool {
601 let delta = self.delta.read().unwrap();
602 let tombstones = self.tombstones.read().unwrap();
603
604 let base_size = self.base.as_ref().map(|b| b.num_vectors).unwrap_or(0);
605 let total_size = base_size + delta.len();
606
607 if delta.len() >= self.config.delta_threshold {
609 return true;
610 }
611
612 if total_size > 0 {
614 let tombstone_ratio = tombstones.len() as f32 / total_size as f32;
615 if tombstone_ratio >= self.config.tombstone_ratio_threshold {
616 return true;
617 }
618 }
619
620 false
621 }
622
623 pub fn compact(&mut self, new_path: &str) -> Result<(), DiskAnnError>
625 where
626 D: Default,
627 {
628 let tombstones = self.tombstones.read().unwrap().clone();
629 let delta = self.delta.read().unwrap();
630
631 let mut all_vectors: Vec<Vec<f32>> = Vec::new();
633
634 if let Some(ref base) = self.base {
636 for i in 0..base.num_vectors {
637 if !tombstones.contains(&(i as u64)) {
638 all_vectors.push(base.get_vector(i));
639 }
640 }
641 }
642
643 for (i, v) in delta.vectors.iter().enumerate() {
645 let global_id = DELTA_ID_OFFSET + i as u64;
646 if !tombstones.contains(&global_id) {
647 all_vectors.push(v.clone());
648 }
649 }
650
651 drop(delta);
652 drop(tombstones);
653
654 if all_vectors.is_empty() {
655 return Err(DiskAnnError::IndexError(
656 "Cannot compact: no vectors remaining after removing tombstones".to_string()
657 ));
658 }
659
660 let new_base = DiskANN::<D>::build_index_default(&all_vectors, self.dist, new_path)?;
662
663 self.base = Some(new_base);
665 self.delta = RwLock::new(DeltaLayer::new(self.config.delta_params.max_degree));
666 self.tombstones = RwLock::new(HashSet::new());
667 self.base_path = Some(new_path.to_string());
668
669 Ok(())
670 }
671
672 pub fn stats(&self) -> IncrementalStats {
674 let delta = self.delta.read().unwrap();
675 let tombstones = self.tombstones.read().unwrap();
676 let base_count = self.base.as_ref().map(|b| b.num_vectors).unwrap_or(0);
677
678 IncrementalStats {
679 base_vectors: base_count,
680 delta_vectors: delta.len(),
681 tombstones: tombstones.len(),
682 total_live: base_count + delta.len() - tombstones.len(),
683 dim: self.dim,
684 }
685 }
686
687 pub fn dim(&self) -> usize {
689 self.dim
690 }
691}
692
693#[derive(Debug, Clone)]
695pub struct IncrementalStats {
696 pub base_vectors: usize,
697 pub delta_vectors: usize,
698 pub tombstones: usize,
699 pub total_live: usize,
700 pub dim: usize,
701}
702
703#[cfg(test)]
704mod tests {
705 use super::*;
706 use anndists::dist::DistL2;
707 use std::fs;
708
709 fn euclid(a: &[f32], b: &[f32]) -> f32 {
710 a.iter().zip(b).map(|(x, y)| (x - y).powi(2)).sum::<f32>().sqrt()
711 }
712
713 #[test]
714 fn test_incremental_basic() {
715 let path = "test_incremental_basic.db";
716 let _ = fs::remove_file(path);
717
718 let vectors = vec![
720 vec![0.0, 0.0],
721 vec![1.0, 0.0],
722 vec![0.0, 1.0],
723 vec![1.0, 1.0],
724 ];
725
726 let index = IncrementalDiskANN::<DistL2>::build_default(&vectors, path).unwrap();
727
728 let results = index.search(&[0.1, 0.1], 2, 8);
730 assert_eq!(results.len(), 2);
731
732 let _ = fs::remove_file(path);
733 }
734
735 #[test]
736 fn test_incremental_add() {
737 let path = "test_incremental_add.db";
738 let _ = fs::remove_file(path);
739
740 let vectors = vec![
741 vec![0.0, 0.0],
742 vec![1.0, 0.0],
743 ];
744
745 let index = IncrementalDiskANN::<DistL2>::build_default(&vectors, path).unwrap();
746
747 let new_vecs = vec![vec![0.5, 0.5], vec![2.0, 2.0]];
749 let new_ids = index.add_vectors(&new_vecs).unwrap();
750 assert_eq!(new_ids.len(), 2);
751 assert!(is_delta_id(new_ids[0]));
752
753 let results = index.search_with_dists(&[0.5, 0.5], 1, 8);
755 assert!(!results.is_empty());
756
757 let (_best_id, best_dist) = results[0];
759 assert!(best_dist < 0.01, "Expected to find [0.5, 0.5], got dist {}", best_dist);
760
761 let _ = fs::remove_file(path);
762 }
763
764 #[test]
765 fn test_incremental_delete() {
766 let path = "test_incremental_delete.db";
767 let _ = fs::remove_file(path);
768
769 let vectors = vec![
770 vec![0.0, 0.0], vec![1.0, 0.0], vec![0.0, 1.0], ];
774
775 let index = IncrementalDiskANN::<DistL2>::build_default(&vectors, path).unwrap();
776
777 index.delete_vectors(&[0]).unwrap();
779 assert!(index.is_deleted(0));
780
781 let results = index.search(&[0.0, 0.0], 3, 8);
783 assert!(!results.contains(&0), "Deleted vector should not appear in results");
784
785 let _ = fs::remove_file(path);
786 }
787
788 #[test]
789 fn test_incremental_compact() {
790 let path1 = "test_compact_v1.db";
791 let path2 = "test_compact_v2.db";
792 let _ = fs::remove_file(path1);
793 let _ = fs::remove_file(path2);
794
795 let vectors = vec![
796 vec![0.0, 0.0],
797 vec![1.0, 0.0],
798 vec![0.0, 1.0],
799 vec![1.0, 1.0],
800 ];
801
802 let mut index = IncrementalDiskANN::<DistL2>::build_default(&vectors, path1).unwrap();
803
804 index.add_vectors(&[vec![2.0, 2.0], vec![3.0, 3.0]]).unwrap();
806
807 index.delete_vectors(&[0, 1]).unwrap();
809
810 let stats_before = index.stats();
811 assert_eq!(stats_before.base_vectors, 4);
812 assert_eq!(stats_before.delta_vectors, 2);
813 assert_eq!(stats_before.tombstones, 2);
814
815 index.compact(path2).unwrap();
817
818 let stats_after = index.stats();
819 assert_eq!(stats_after.base_vectors, 4); assert_eq!(stats_after.delta_vectors, 0);
821 assert_eq!(stats_after.tombstones, 0);
822
823 let results = index.search(&[2.0, 2.0], 1, 8);
825 assert!(!results.is_empty());
826
827 let _ = fs::remove_file(path1);
828 let _ = fs::remove_file(path2);
829 }
830
831 #[test]
832 fn test_delta_only() {
833 let config = IncrementalConfig::default();
835 let index = IncrementalDiskANN::<DistL2>::new_empty(2, DistL2 {}, config);
836
837 let vecs = vec![
839 vec![0.0, 0.0],
840 vec![1.0, 0.0],
841 vec![0.0, 1.0],
842 vec![1.0, 1.0],
843 vec![0.5, 0.5],
844 ];
845 index.add_vectors(&vecs).unwrap();
846
847 let results = index.search_with_dists(&[0.5, 0.5], 3, 8);
849 assert_eq!(results.len(), 3);
850
851 let best_vec = index.get_vector(results[0].0).unwrap();
853 let dist = euclid(&best_vec, &[0.5, 0.5]);
854 assert!(dist < 0.01);
855 }
856}