1use std::cmp::Ordering;
4use std::collections::BinaryHeap;
5use std::sync::OnceLock;
6
7use ahash::AHashSet;
8
9use frankensearch_core::filter::SearchFilter;
10use frankensearch_core::{SearchError, SearchResult, VectorHit};
11use rayon::prelude::*;
12
13use crate::wal::{from_wal_index, is_wal_index, to_wal_index};
14use crate::{
15 Quantization, VectorIndex, dot_product_f16_bytes_f32, dot_product_f32_bytes_f32,
16 dot_product_f32_f32,
17};
18
19pub const PARALLEL_THRESHOLD: usize = 10_000;
21pub const PARALLEL_CHUNK_SIZE: usize = 1_024;
23
24#[derive(Debug, Clone, Copy)]
31pub struct SearchParams {
32 pub parallel_threshold: usize,
35 pub parallel_chunk_size: usize,
37 pub parallel_enabled: bool,
40}
41
42impl Default for SearchParams {
43 fn default() -> Self {
44 Self {
45 parallel_threshold: PARALLEL_THRESHOLD,
46 parallel_chunk_size: PARALLEL_CHUNK_SIZE,
47 parallel_enabled: parallel_search_enabled(),
48 }
49 }
50}
51
52static PARALLEL_SEARCH_ENABLED_CACHE: OnceLock<bool> = OnceLock::new();
53
54#[derive(Debug, Clone, Copy)]
55struct HeapEntry {
56 index: usize,
57 score: f32,
58}
59
60impl HeapEntry {
61 const fn new(index: usize, score: f32) -> Self {
62 Self { index, score }
63 }
64}
65
66impl PartialEq for HeapEntry {
67 fn eq(&self, other: &Self) -> bool {
68 self.index == other.index && self.score.to_bits() == other.score.to_bits()
69 }
70}
71
72impl Eq for HeapEntry {}
73
74impl PartialOrd for HeapEntry {
75 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
76 Some(self.cmp(other))
77 }
78}
79
80impl Ord for HeapEntry {
81 fn cmp(&self, other: &Self) -> Ordering {
82 match score_key(self.score).total_cmp(&score_key(other.score)) {
85 Ordering::Less => Ordering::Greater,
86 Ordering::Greater => Ordering::Less,
87 Ordering::Equal => self.index.cmp(&other.index),
88 }
89 }
90}
91
92impl VectorIndex {
93 pub fn search_top_k(
104 &self,
105 query: &[f32],
106 limit: usize,
107 filter: Option<&dyn SearchFilter>,
108 ) -> SearchResult<Vec<VectorHit>> {
109 self.search_top_k_internal(
110 query,
111 limit,
112 filter,
113 PARALLEL_THRESHOLD,
114 PARALLEL_CHUNK_SIZE,
115 parallel_search_enabled(),
116 )
117 }
118
119 pub fn search_top_k_with_params(
130 &self,
131 query: &[f32],
132 limit: usize,
133 filter: Option<&dyn SearchFilter>,
134 params: SearchParams,
135 ) -> SearchResult<Vec<VectorHit>> {
136 self.search_top_k_internal(
137 query,
138 limit,
139 filter,
140 params.parallel_threshold,
141 params.parallel_chunk_size,
142 params.parallel_enabled,
143 )
144 }
145
146 fn search_top_k_internal(
147 &self,
148 query: &[f32],
149 limit: usize,
150 filter: Option<&dyn SearchFilter>,
151 parallel_threshold: usize,
152 parallel_chunk_size: usize,
153 parallel_enabled: bool,
154 ) -> SearchResult<Vec<VectorHit>> {
155 self.ensure_query_dimension(query)?;
156 let has_main = self.record_count() > 0;
157 let has_wal = !self.wal_entries.is_empty();
158 if limit == 0 || (!has_main && !has_wal) {
159 return Ok(Vec::new());
160 }
161 let chunk_size = parallel_chunk_size.max(1);
162 let use_parallel = parallel_enabled && self.record_count() >= parallel_threshold;
163 let total_candidate_upper_bound =
164 self.record_count().saturating_add(self.wal_entries.len());
165
166 if filter.is_none() && limit >= total_candidate_upper_bound {
170 let mut winners = if has_main {
171 if use_parallel {
172 self.scan_parallel_collect_all(query, chunk_size)?
173 } else {
174 self.scan_range_collect_all(0, self.record_count(), query)?
175 }
176 } else {
177 Vec::new()
178 };
179 if has_wal {
180 self.scan_wal_collect_all(query, &mut winners)?;
181 }
182 winners.sort_by(compare_best_first);
183 return self.resolve_sorted_entries(winners);
184 }
185
186 let mut heap = if has_main {
187 if use_parallel {
188 self.scan_parallel(query, limit, filter, chunk_size)?
189 } else {
190 self.scan_sequential(query, limit, filter)?
191 }
192 } else {
193 let max_wal = self.wal_entries.len();
194 BinaryHeap::with_capacity(limit.min(max_wal).saturating_add(1))
195 };
196
197 if has_wal {
199 self.scan_wal(query, &mut heap, limit, filter)?;
200 }
201
202 self.resolve_hits(heap)
203 }
204
205 fn scan_sequential(
206 &self,
207 query: &[f32],
208 limit: usize,
209 filter: Option<&dyn SearchFilter>,
210 ) -> SearchResult<BinaryHeap<HeapEntry>> {
211 filter.map_or_else(
213 || self.scan_range_chunk(0, self.record_count(), query, limit),
214 |filter| self.scan_range_chunk_filtered(0, self.record_count(), query, limit, filter),
215 )
216 }
217
218 fn scan_parallel(
219 &self,
220 query: &[f32],
221 limit: usize,
222 filter: Option<&dyn SearchFilter>,
223 chunk_size: usize,
224 ) -> SearchResult<BinaryHeap<HeapEntry>> {
225 let chunk_count = self.record_count().div_ceil(chunk_size);
226 let partial_heaps: SearchResult<Vec<BinaryHeap<HeapEntry>>> = (0..chunk_count)
227 .into_par_iter()
228 .map(|chunk_index| {
229 let start = chunk_index * chunk_size;
230 let end = (start + chunk_size).min(self.record_count());
231 filter.map_or_else(
232 || self.scan_range_chunk(start, end, query, limit),
233 |active_filter| {
234 self.scan_range_chunk_filtered(start, end, query, limit, active_filter)
235 },
236 )
237 })
238 .collect();
239
240 Ok(merge_partial_heaps(partial_heaps?, limit))
241 }
242
243 fn scan_parallel_collect_all(
244 &self,
245 query: &[f32],
246 chunk_size: usize,
247 ) -> SearchResult<Vec<HeapEntry>> {
248 let chunk_count = self.record_count().div_ceil(chunk_size);
249 let partial: SearchResult<Vec<Vec<HeapEntry>>> = (0..chunk_count)
250 .into_par_iter()
251 .map(|chunk_index| {
252 let start = chunk_index * chunk_size;
253 let end = (start + chunk_size).min(self.record_count());
254 self.scan_range_collect_all(start, end, query)
255 })
256 .collect();
257
258 let partial = partial?;
259 let total = partial.iter().map(std::vec::Vec::len).sum();
260 let mut merged = Vec::with_capacity(total);
261 for mut chunk in partial {
262 merged.append(&mut chunk);
263 }
264 Ok(merged)
265 }
266
267 fn scan_range_collect_all(
268 &self,
269 start: usize,
270 end: usize,
271 query: &[f32],
272 ) -> SearchResult<Vec<HeapEntry>> {
273 let mut winners = Vec::with_capacity(end.saturating_sub(start));
274 let dim = self.dimension();
275
276 match self.quantization() {
277 Quantization::F16 => {
278 let stride = dim * 2;
279 let mut flags_offset = self.records_offset + start * 16 + 14;
280 let mut vector_offset = self.vectors_offset + start * stride;
281
282 for index in start..end {
283 let flags_bytes = &self.data[flags_offset..flags_offset + 2];
284 let flags = u16::from_le_bytes([flags_bytes[0], flags_bytes[1]]);
285
286 if (flags & 0x0001) == 0 {
287 let vector_bytes = &self.data[vector_offset..vector_offset + stride];
288 let score = dot_product_f16_bytes_f32(vector_bytes, query)?;
289 winners.push(HeapEntry::new(index, score));
290 }
291
292 flags_offset += 16;
293 vector_offset += stride;
294 }
295 }
296 Quantization::F32 => {
297 let stride = dim * 4;
298 let mut flags_offset = self.records_offset + start * 16 + 14;
299 let mut vector_offset = self.vectors_offset + start * stride;
300
301 for index in start..end {
302 let flags_bytes = &self.data[flags_offset..flags_offset + 2];
303 let flags = u16::from_le_bytes([flags_bytes[0], flags_bytes[1]]);
304
305 if (flags & 0x0001) == 0 {
306 let vector_bytes = &self.data[vector_offset..vector_offset + stride];
307 let score = dot_product_f32_bytes_f32(vector_bytes, query)?;
308 winners.push(HeapEntry::new(index, score));
309 }
310
311 flags_offset += 16;
312 vector_offset += stride;
313 }
314 }
315 }
316 Ok(winners)
317 }
318
319 fn scan_range_chunk(
320 &self,
321 start: usize,
322 end: usize,
323 query: &[f32],
324 limit: usize,
325 ) -> SearchResult<BinaryHeap<HeapEntry>> {
326 let max_elements = end.saturating_sub(start);
327 let mut heap = BinaryHeap::with_capacity(limit.min(max_elements).saturating_add(1));
328 let dim = self.dimension();
329 let mut cutoff = f32::NEG_INFINITY;
330
331 match self.quantization() {
332 Quantization::F16 => {
333 let stride = dim * 2;
334 let mut flags_offset = self.records_offset + start * 16 + 14;
336 let mut vector_offset = self.vectors_offset + start * stride;
337
338 for index in start..end {
339 let flags_bytes = &self.data[flags_offset..flags_offset + 2];
342 let flags = u16::from_le_bytes([flags_bytes[0], flags_bytes[1]]);
343
344 if (flags & 0x0001) == 0 {
345 let vector_bytes = &self.data[vector_offset..vector_offset + stride];
346 let score = dot_product_f16_bytes_f32(vector_bytes, query)?;
347 if heap.len() < limit || score_key(score) >= cutoff {
348 insert_candidate(&mut heap, HeapEntry::new(index, score), limit);
349 if heap.len() >= limit
350 && let Some(&worst) = heap.peek()
351 {
352 cutoff = score_key(worst.score);
353 }
354 }
355 }
356
357 flags_offset += 16;
358 vector_offset += stride;
359 }
360 }
361 Quantization::F32 => {
362 let stride = dim * 4;
363 let mut flags_offset = self.records_offset + start * 16 + 14;
364 let mut vector_offset = self.vectors_offset + start * stride;
365
366 for index in start..end {
367 let flags_bytes = &self.data[flags_offset..flags_offset + 2];
368 let flags = u16::from_le_bytes([flags_bytes[0], flags_bytes[1]]);
369
370 if (flags & 0x0001) == 0 {
371 let vector_bytes = &self.data[vector_offset..vector_offset + stride];
372 let score = dot_product_f32_bytes_f32(vector_bytes, query)?;
373 if heap.len() < limit || score_key(score) >= cutoff {
374 insert_candidate(&mut heap, HeapEntry::new(index, score), limit);
375 if heap.len() >= limit
376 && let Some(&worst) = heap.peek()
377 {
378 cutoff = score_key(worst.score);
379 }
380 }
381 }
382
383 flags_offset += 16;
384 vector_offset += stride;
385 }
386 }
387 }
388 Ok(heap)
389 }
390
391 fn scan_range_chunk_filtered(
392 &self,
393 start: usize,
394 end: usize,
395 query: &[f32],
396 limit: usize,
397 filter: &dyn SearchFilter,
398 ) -> SearchResult<BinaryHeap<HeapEntry>> {
399 let max_elements = end.saturating_sub(start);
400 let mut heap = BinaryHeap::with_capacity(limit.min(max_elements).saturating_add(1));
401 let dim = self.dimension();
402 let mut cutoff = f32::NEG_INFINITY;
403
404 match self.quantization() {
405 Quantization::F16 => {
406 let stride = dim * 2;
407 let mut record_offset = self.records_offset + start * 16;
408 let mut vector_offset = self.vectors_offset + start * stride;
409
410 for index in start..end {
411 let flags_bytes = &self.data[record_offset + 14..record_offset + 16];
412 let flags = u16::from_le_bytes([flags_bytes[0], flags_bytes[1]]);
413
414 if (flags & 0x0001) != 0 {
415 record_offset += 16;
416 vector_offset += stride;
417 continue;
418 }
419
420 let hash_bytes = &self.data[record_offset..record_offset + 8];
421 let hash = u64::from_le_bytes([
422 hash_bytes[0],
423 hash_bytes[1],
424 hash_bytes[2],
425 hash_bytes[3],
426 hash_bytes[4],
427 hash_bytes[5],
428 hash_bytes[6],
429 hash_bytes[7],
430 ]);
431
432 let passed = if let Some(matches) = filter.matches_doc_id_hash(hash, None) {
433 matches
434 } else {
435 let doc_id = self.doc_id_at(index)?;
436 filter.matches(doc_id, None)
437 };
438
439 if passed {
440 let vector_bytes = &self.data[vector_offset..vector_offset + stride];
441 let score = dot_product_f16_bytes_f32(vector_bytes, query)?;
442 if heap.len() < limit || score_key(score) >= cutoff {
443 insert_candidate(&mut heap, HeapEntry::new(index, score), limit);
444 if heap.len() >= limit
445 && let Some(&worst) = heap.peek()
446 {
447 cutoff = score_key(worst.score);
448 }
449 }
450 }
451
452 record_offset += 16;
453 vector_offset += stride;
454 }
455 }
456 Quantization::F32 => {
457 let stride = dim * 4;
458 let mut record_offset = self.records_offset + start * 16;
459 let mut vector_offset = self.vectors_offset + start * stride;
460
461 for index in start..end {
462 let flags_bytes = &self.data[record_offset + 14..record_offset + 16];
463 let flags = u16::from_le_bytes([flags_bytes[0], flags_bytes[1]]);
464
465 if (flags & 0x0001) != 0 {
466 record_offset += 16;
467 vector_offset += stride;
468 continue;
469 }
470
471 let hash_bytes = &self.data[record_offset..record_offset + 8];
472 let hash = u64::from_le_bytes([
473 hash_bytes[0],
474 hash_bytes[1],
475 hash_bytes[2],
476 hash_bytes[3],
477 hash_bytes[4],
478 hash_bytes[5],
479 hash_bytes[6],
480 hash_bytes[7],
481 ]);
482
483 let passed = if let Some(matches) = filter.matches_doc_id_hash(hash, None) {
484 matches
485 } else {
486 let doc_id = self.doc_id_at(index)?;
487 filter.matches(doc_id, None)
488 };
489
490 if passed {
491 let vector_bytes = &self.data[vector_offset..vector_offset + stride];
492 let score = dot_product_f32_bytes_f32(vector_bytes, query)?;
493 if heap.len() < limit || score_key(score) >= cutoff {
494 insert_candidate(&mut heap, HeapEntry::new(index, score), limit);
495 if heap.len() >= limit
496 && let Some(&worst) = heap.peek()
497 {
498 cutoff = score_key(worst.score);
499 }
500 }
501 }
502
503 record_offset += 16;
504 vector_offset += stride;
505 }
506 }
507 }
508 Ok(heap)
509 }
510
511 fn scan_wal(
512 &self,
513 query: &[f32],
514 heap: &mut BinaryHeap<HeapEntry>,
515 limit: usize,
516 filter: Option<&dyn SearchFilter>,
517 ) -> SearchResult<()> {
518 for (idx, entry) in self.wal_entries.iter().enumerate() {
519 if let Some(f) = filter {
520 if let Some(matches) = f.matches_doc_id_hash(entry.doc_id_hash, None) {
521 if !matches {
522 continue;
523 }
524 } else if !f.matches(&entry.doc_id, None) {
525 continue;
526 }
527 }
528 let score = dot_product_f32_f32(&entry.embedding, query)?;
529 if !score.is_finite() {
532 continue;
533 }
534 insert_candidate(heap, HeapEntry::new(to_wal_index(idx), score), limit);
535 }
536 Ok(())
537 }
538
539 fn scan_wal_collect_all(
540 &self,
541 query: &[f32],
542 winners: &mut Vec<HeapEntry>,
543 ) -> SearchResult<()> {
544 winners.reserve(self.wal_entries.len());
545 for (idx, entry) in self.wal_entries.iter().enumerate() {
546 let score = dot_product_f32_f32(&entry.embedding, query)?;
547 if !score.is_finite() {
548 continue;
549 }
550 winners.push(HeapEntry::new(to_wal_index(idx), score));
551 }
552 Ok(())
553 }
554
555 fn resolve_hits(&self, heap: BinaryHeap<HeapEntry>) -> SearchResult<Vec<VectorHit>> {
556 if heap.is_empty() {
557 return Ok(Vec::new());
558 }
559
560 let mut winners = heap.into_vec();
561 winners.sort_by(compare_best_first);
562 self.resolve_sorted_entries(winners)
563 }
564
565 fn resolve_sorted_entries(&self, winners: Vec<HeapEntry>) -> SearchResult<Vec<VectorHit>> {
566 let wal_hashes: AHashSet<u64> = self.wal_entries.iter().map(|e| e.doc_id_hash).collect();
570
571 let mut seen: AHashSet<String> = AHashSet::with_capacity(winners.len());
572 let mut hits = Vec::with_capacity(winners.len());
573 for winner in winners {
574 if is_wal_index(winner.index) {
575 let wal_idx = from_wal_index(winner.index);
576 let doc_id = &self.wal_entries[wal_idx].doc_id;
577 if !seen.insert(doc_id.clone()) {
579 continue;
580 }
581 hits.push(self.resolve_wal_hit(&winner)?);
582 } else {
583 if self.is_deleted(winner.index) {
585 continue;
586 }
587 let doc_id = self.doc_id_at(winner.index)?.to_owned();
588 let record = self.record_at(winner.index)?;
590 let doc_id_hash = record.doc_id_hash;
591 if wal_hashes.contains(&doc_id_hash) {
593 let has_wal_entry = self
594 .wal_entries
595 .iter()
596 .any(|e| e.doc_id_hash == doc_id_hash && e.doc_id == doc_id);
597 if has_wal_entry {
598 continue;
599 }
600 }
601 if !seen.insert(doc_id.clone()) {
603 continue;
604 }
605 let index_u32 =
606 u32::try_from(winner.index).map_err(|_| SearchError::InvalidConfig {
607 field: "index".to_owned(),
608 value: winner.index.to_string(),
609 reason: "winner index exceeds u32 range for VectorHit".to_owned(),
610 })?;
611 hits.push(VectorHit {
612 index: index_u32,
613 score: winner.score,
614 doc_id,
615 });
616 }
617 }
618
619 Ok(hits)
620 }
621
622 fn resolve_wal_hit(&self, winner: &HeapEntry) -> SearchResult<VectorHit> {
623 if !is_wal_index(winner.index) {
624 return Err(SearchError::InvalidConfig {
625 field: "index".to_owned(),
626 value: winner.index.to_string(),
627 reason: "winner index is not WAL-encoded".to_owned(),
628 });
629 }
630
631 let wal_idx = from_wal_index(winner.index);
632 let entry = self
633 .wal_entries
634 .get(wal_idx)
635 .ok_or_else(|| SearchError::IndexCorrupted {
636 path: self.path.clone(),
637 detail: format!(
638 "WAL index {} out of bounds (wal_entries.len() = {})",
639 wal_idx,
640 self.wal_entries.len()
641 ),
642 })?;
643 let virtual_index =
644 self.record_count()
645 .checked_add(wal_idx)
646 .ok_or_else(|| SearchError::InvalidConfig {
647 field: "index".to_owned(),
648 value: wal_idx.to_string(),
649 reason: "WAL virtual index overflow".to_owned(),
650 })?;
651 let index_u32 = u32::try_from(virtual_index).map_err(|_| SearchError::InvalidConfig {
652 field: "index".to_owned(),
653 value: virtual_index.to_string(),
654 reason: "WAL entry index exceeds u32 range".to_owned(),
655 })?;
656 Ok(VectorHit {
657 index: index_u32,
658 score: winner.score,
659 doc_id: entry.doc_id.clone(),
660 })
661 }
662
663 #[allow(clippy::missing_const_for_fn)]
664 fn ensure_query_dimension(&self, query: &[f32]) -> SearchResult<()> {
665 if query.len() != self.dimension() {
666 return Err(SearchError::DimensionMismatch {
667 expected: self.dimension(),
668 found: query.len(),
669 });
670 }
671 Ok(())
672 }
673}
674
675pub(crate) const fn score_key(score: f32) -> f32 {
676 if score.is_nan() {
677 f32::NEG_INFINITY
678 } else {
679 score
680 }
681}
682
683fn compare_best_first(left: &HeapEntry, right: &HeapEntry) -> Ordering {
684 match score_key(right.score).total_cmp(&score_key(left.score)) {
685 Ordering::Equal => left.index.cmp(&right.index),
686 other => other,
687 }
688}
689
690fn candidate_is_better(left: HeapEntry, right: HeapEntry) -> bool {
691 match score_key(left.score).total_cmp(&score_key(right.score)) {
692 Ordering::Greater => true,
693 Ordering::Less => false,
694 Ordering::Equal => left.index < right.index,
695 }
696}
697
698fn insert_candidate(heap: &mut BinaryHeap<HeapEntry>, candidate: HeapEntry, limit: usize) {
699 if limit == 0 {
700 return;
701 }
702 if heap.len() < limit {
703 heap.push(candidate);
704 return;
705 }
706 if let Some(&worst) = heap.peek()
707 && candidate_is_better(candidate, worst)
708 {
709 let _ = heap.pop();
710 heap.push(candidate);
711 }
712}
713
714fn merge_partial_heaps(
715 partial_heaps: Vec<BinaryHeap<HeapEntry>>,
716 limit: usize,
717) -> BinaryHeap<HeapEntry> {
718 let mut total_elements = 0_usize;
719 for heap in &partial_heaps {
720 total_elements = total_elements.saturating_add(heap.len());
721 }
722 let capacity = limit.min(total_elements).saturating_add(1);
723 let mut merged = BinaryHeap::with_capacity(capacity);
724 for heap in partial_heaps {
725 for entry in heap {
726 insert_candidate(&mut merged, entry, limit);
727 }
728 }
729 merged
730}
731
732fn parallel_search_enabled() -> bool {
733 *PARALLEL_SEARCH_ENABLED_CACHE.get_or_init(|| {
734 let value = std::env::var("FRANKENSEARCH_PARALLEL_SEARCH").ok();
735 parse_parallel_search_env(value.as_deref())
736 })
737}
738
739fn parse_parallel_search_env(value: Option<&str>) -> bool {
740 value.is_none_or(|raw| {
741 let normalized = raw.trim();
742 !normalized.eq_ignore_ascii_case("0")
743 && !normalized.eq_ignore_ascii_case("false")
744 && !normalized.eq_ignore_ascii_case("no")
745 && !normalized.eq_ignore_ascii_case("off")
746 })
747}
748
749#[cfg(test)]
750mod tests {
751 use std::collections::HashSet;
752 use std::fs;
753 use std::path::PathBuf;
754 use std::time::{SystemTime, UNIX_EPOCH};
755
756 use super::*;
757 use crate::{Quantization, VectorIndex};
758 use frankensearch_core::PredicateFilter;
759 use proptest::prelude::*;
760
761 fn temp_index_path(name: &str) -> PathBuf {
762 let now = SystemTime::now()
763 .duration_since(UNIX_EPOCH)
764 .unwrap_or_default()
765 .as_nanos();
766 std::env::temp_dir().join(format!(
767 "frankensearch-index-search-{name}-{}-{now}.fsvi",
768 std::process::id()
769 ))
770 }
771
772 fn write_index(path: &std::path::Path, rows: &[(&str, Vec<f32>)]) -> SearchResult<()> {
773 let dimension =
774 rows.first()
775 .map(|(_, vec)| vec.len())
776 .ok_or_else(|| SearchError::InvalidConfig {
777 field: "rows".to_owned(),
778 value: "[]".to_owned(),
779 reason: "rows must not be empty".to_owned(),
780 })?;
781 let mut writer =
782 VectorIndex::create_with_revision(path, "hash", "test", dimension, Quantization::F16)?;
783 for (doc_id, vector) in rows {
784 writer.write_record(doc_id, vector)?;
785 }
786 writer.finish()
787 }
788
789 fn create_rows(vectors: &[Vec<f32>]) -> Vec<(String, Vec<f32>)> {
790 vectors
791 .iter()
792 .enumerate()
793 .map(|(idx, vector)| (format!("doc-{idx:03}"), vector.clone()))
794 .collect()
795 }
796
797 proptest! {
798 #[test]
799 fn property_top_k_invariants_hold(
800 vectors in prop::collection::vec(prop::collection::vec(-1.0_f32..1.0_f32, 4), 1..20),
801 query in prop::collection::vec(-1.0_f32..1.0_f32, 4),
802 limit in 1_usize..20,
803 ) {
804 let path = temp_index_path("prop-top-k");
805 let rows = create_rows(&vectors);
806 let row_refs: Vec<(&str, Vec<f32>)> = rows
807 .iter()
808 .map(|(doc_id, vector)| (doc_id.as_str(), vector.clone()))
809 .collect();
810 prop_assume!(write_index(&path, &row_refs).is_ok());
813
814 let index = VectorIndex::open(&path).expect("open index");
815 let hits = index.search_top_k(&query, limit, None).expect("search");
816
817 let expected_len = limit.min(vectors.len());
818 prop_assert_eq!(hits.len(), expected_len);
819 let mut seen_indices = HashSet::new();
820 for hit in &hits {
821 prop_assert!(seen_indices.insert(hit.index));
822 }
823 let _ = fs::remove_file(&path);
824 }
825
826 #[test]
827 fn property_parallel_and_sequential_paths_match(
828 vectors in prop::collection::vec(prop::collection::vec(-1.0_f32..1.0_f32, 4), 8..40),
829 query in prop::collection::vec(-1.0_f32..1.0_f32, 4),
830 limit in 1_usize..20,
831 ) {
832 let path = temp_index_path("prop-parallel");
833 let rows = create_rows(&vectors);
834 let row_refs: Vec<(&str, Vec<f32>)> = rows
835 .iter()
836 .map(|(doc_id, vector)| (doc_id.as_str(), vector.clone()))
837 .collect();
838 prop_assume!(write_index(&path, &row_refs).is_ok());
839
840 let index = VectorIndex::open(&path).expect("open index");
841 let sequential = index
842 .search_top_k_internal(&query, limit, None, usize::MAX, PARALLEL_CHUNK_SIZE, true)
843 .expect("sequential search");
844 let parallel = index
845 .search_top_k_internal(&query, limit, None, 1, 4, true)
846 .expect("parallel search");
847
848 prop_assert_eq!(sequential.len(), parallel.len());
849 for (left, right) in sequential.iter().zip(parallel.iter()) {
850 prop_assert_eq!(&left.doc_id, &right.doc_id);
851 prop_assert_eq!(left.index, right.index);
852 prop_assert!((left.score - right.score).abs() <= 1e-6);
853 }
854 let _ = fs::remove_file(&path);
855 }
856 }
857
858 #[test]
859 fn top_k_orders_by_score_descending() {
860 let path = temp_index_path("top-k-order");
861 write_index(
862 &path,
863 &[
864 ("doc-a", vec![1.0, 0.0, 0.0, 0.0]),
865 ("doc-b", vec![0.8, 0.0, 0.0, 0.0]),
866 ("doc-c", vec![0.2, 0.0, 0.0, 0.0]),
867 ],
868 )
869 .expect("write index");
870
871 let index = VectorIndex::open(&path).expect("open index");
872 let hits = index
873 .search_top_k(&[1.0, 0.0, 0.0, 0.0], 2, None)
874 .expect("search");
875
876 assert_eq!(hits.len(), 2);
877 assert_eq!(hits[0].doc_id, "doc-a");
878 assert_eq!(hits[1].doc_id, "doc-b");
879 assert!(hits[0].score >= hits[1].score);
880 }
881
882 #[test]
883 fn filter_excludes_matching_doc_id() {
884 let path = temp_index_path("filter");
885 write_index(
886 &path,
887 &[
888 ("doc-a", vec![1.0, 0.0, 0.0, 0.0]),
889 ("doc-b", vec![0.8, 0.0, 0.0, 0.0]),
890 ("doc-c", vec![0.2, 0.0, 0.0, 0.0]),
891 ],
892 )
893 .expect("write index");
894
895 let index = VectorIndex::open(&path).expect("open index");
896 let filter = PredicateFilter::new("exclude-a", |doc_id| doc_id != "doc-a");
897 let hits = index
898 .search_top_k(&[1.0, 0.0, 0.0, 0.0], 2, Some(&filter))
899 .expect("search");
900
901 assert_eq!(hits.len(), 2);
902 assert!(hits.iter().all(|hit| hit.doc_id != "doc-a"));
903 }
904
905 #[test]
906 fn tombstoned_records_are_excluded_from_search() {
907 let path = temp_index_path("tombstone-excluded");
908 write_index(
909 &path,
910 &[
911 ("doc-a", vec![1.0, 0.0, 0.0, 0.0]),
912 ("doc-b", vec![0.8, 0.0, 0.0, 0.0]),
913 ("doc-c", vec![0.2, 0.0, 0.0, 0.0]),
914 ],
915 )
916 .expect("write index");
917
918 let mut index = VectorIndex::open(&path).expect("open index");
919 assert!(index.soft_delete("doc-a").expect("delete doc-a"));
920
921 let hits = index
922 .search_top_k(&[1.0, 0.0, 0.0, 0.0], 10, None)
923 .expect("search");
924
925 assert_eq!(hits.len(), 2);
926 assert!(hits.iter().all(|hit| hit.doc_id != "doc-a"));
927 }
928
929 #[test]
930 fn parallel_and_sequential_ignore_tombstones() {
931 let path = temp_index_path("tombstone-parallel");
932 let mut rows = Vec::new();
933 for i in 0..96 {
934 let score = f32::from(u16::try_from(96 - i).expect("test index must fit in u16"));
935 rows.push((format!("doc-{i:03}"), vec![score, 0.0, 0.0, 0.0]));
936 }
937 let refs: Vec<(&str, Vec<f32>)> = rows
938 .iter()
939 .map(|(doc_id, vec)| (doc_id.as_str(), vec.clone()))
940 .collect();
941 write_index(&path, &refs).expect("write index");
942
943 let mut index = VectorIndex::open(&path).expect("open index");
944 let deleted = index
945 .soft_delete_batch(&["doc-000", "doc-001", "doc-002", "doc-003"])
946 .expect("batch delete");
947 assert_eq!(deleted, 4);
948
949 let query = [1.0, 0.0, 0.0, 0.0];
950 let sequential = index
951 .search_top_k_internal(&query, 10, None, usize::MAX, PARALLEL_CHUNK_SIZE, true)
952 .expect("sequential");
953 let parallel = index
954 .search_top_k_internal(&query, 10, None, 1, 8, true)
955 .expect("parallel");
956
957 assert_eq!(sequential.len(), parallel.len());
958 let deleted_ids = ["doc-000", "doc-001", "doc-002", "doc-003"];
959 assert!(
960 sequential
961 .iter()
962 .all(|hit| !deleted_ids.contains(&hit.doc_id.as_str()))
963 );
964 assert!(
965 parallel
966 .iter()
967 .all(|hit| !deleted_ids.contains(&hit.doc_id.as_str()))
968 );
969 for (left, right) in sequential.iter().zip(parallel.iter()) {
970 assert_eq!(left.doc_id, right.doc_id);
971 assert!((left.score - right.score).abs() < 1e-6);
972 }
973 }
974
975 #[test]
976 fn parallel_and_sequential_paths_match() {
977 let path = temp_index_path("parallel-match");
978 let mut rows = Vec::new();
979 for i in 0..64 {
980 let rank = f32::from(u16::try_from(i).expect("test index must fit in u16"));
981 rows.push((
982 format!("doc-{i:03}"),
983 vec![rank, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
984 ));
985 }
986 let refs: Vec<(&str, Vec<f32>)> = rows
987 .iter()
988 .map(|(doc_id, vec)| (doc_id.as_str(), vec.clone()))
989 .collect();
990 write_index(&path, &refs).expect("write index");
991
992 let index = VectorIndex::open(&path).expect("open index");
993 let query = [1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0];
994 let sequential = index
995 .search_top_k_internal(&query, 10, None, usize::MAX, PARALLEL_CHUNK_SIZE, true)
996 .expect("sequential search");
997 let parallel = index
998 .search_top_k_internal(&query, 10, None, 1, 4, true)
999 .expect("parallel search");
1000
1001 assert_eq!(sequential.len(), parallel.len());
1002 for (left, right) in sequential.iter().zip(parallel.iter()) {
1003 assert_eq!(left.doc_id, right.doc_id);
1004 assert!((left.score - right.score).abs() < 1e-6);
1005 }
1006 }
1007
1008 #[test]
1009 fn parallel_and_sequential_paths_match_with_filter() {
1010 let path = temp_index_path("parallel-match-filter");
1011 let mut rows = Vec::new();
1012 for i in 0..96 {
1013 let rank = f32::from(u16::try_from(i).expect("test index must fit in u16"));
1014 rows.push((
1015 format!("doc-{i:03}"),
1016 vec![rank, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
1017 ));
1018 }
1019 let refs: Vec<(&str, Vec<f32>)> = rows
1020 .iter()
1021 .map(|(doc_id, vec)| (doc_id.as_str(), vec.clone()))
1022 .collect();
1023 write_index(&path, &refs).expect("write index");
1024
1025 let index = VectorIndex::open(&path).expect("open index");
1026 let query = [1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0];
1027 let filter = PredicateFilter::new("even-docs", |doc_id| {
1028 let suffix = doc_id.strip_prefix("doc-").unwrap_or_default();
1029 suffix.parse::<u32>().is_ok_and(|v| v % 2 == 0)
1030 });
1031
1032 let sequential = index
1033 .search_top_k_internal(
1034 &query,
1035 15,
1036 Some(&filter),
1037 usize::MAX,
1038 PARALLEL_CHUNK_SIZE,
1039 true,
1040 )
1041 .expect("sequential search");
1042 let parallel = index
1043 .search_top_k_internal(&query, 15, Some(&filter), 1, 8, true)
1044 .expect("parallel search");
1045
1046 assert_eq!(sequential.len(), parallel.len());
1047 for (left, right) in sequential.iter().zip(parallel.iter()) {
1048 assert_eq!(left.doc_id, right.doc_id);
1049 assert!((left.score - right.score).abs() < 1e-6);
1050 }
1051 }
1052
1053 #[test]
1054 fn resolves_doc_ids_only_for_winners() {
1055 let path = temp_index_path("two-phase");
1056 write_index(
1057 &path,
1058 &[("winner", vec![1.0, 0.0]), ("loser", vec![0.0, 1.0])],
1059 )
1060 .expect("write index");
1061
1062 let inspect = VectorIndex::open(&path).expect("open index");
1063 let loser_idx = inspect
1064 .find_index_by_doc_hash(super::super::fnv1a_hash(b"loser"))
1065 .expect("loser index");
1066 let entry = inspect.record_at(loser_idx).expect("record");
1067 let loser_offset =
1068 inspect.strings_offset + usize::try_from(entry.doc_id_offset).unwrap_or(0);
1069 drop(inspect);
1070
1071 let mut bytes = fs::read(&path).expect("read bytes");
1072 bytes[loser_offset] = 0xFF;
1073 fs::write(&path, bytes).expect("write corrupt bytes");
1074
1075 let index = VectorIndex::open(&path).expect("open index");
1076 let hits = index
1077 .search_top_k(&[1.0, 0.0], 1, None)
1078 .expect("search should only resolve winner doc_id");
1079 assert_eq!(hits.len(), 1);
1080 assert_eq!(hits[0].doc_id, "winner");
1081 }
1082
1083 #[test]
1084 fn bitset_filter_skips_doc_id_decode_for_non_matching_records() {
1085 let path = temp_index_path("bitset-hash-fast-path");
1086 write_index(
1087 &path,
1088 &[("doc-a", vec![1.0, 0.0]), ("doc-b", vec![0.0, 1.0])],
1089 )
1090 .expect("write index");
1091
1092 let inspect = VectorIndex::open(&path).expect("open index");
1093 let bad_idx = inspect
1094 .find_index_by_doc_hash(super::super::fnv1a_hash(b"doc-b"))
1095 .expect("doc-b index");
1096 let record = inspect.record_at(bad_idx).expect("record");
1097 let bad_offset =
1098 inspect.strings_offset + usize::try_from(record.doc_id_offset).expect("offset");
1099 drop(inspect);
1100
1101 let mut bytes = fs::read(&path).expect("read bytes");
1102 bytes[bad_offset] = 0xFF;
1103 fs::write(&path, bytes).expect("write corrupt bytes");
1104
1105 let index = VectorIndex::open(&path).expect("open index");
1106 let filter = frankensearch_core::BitsetFilter::from_doc_ids(["doc-a"]);
1107 let hits = index
1108 .search_top_k(&[1.0, 0.0], 10, Some(&filter))
1109 .expect("search should ignore corrupted filtered-out doc_id");
1110
1111 assert_eq!(hits.len(), 1);
1112 assert_eq!(hits[0].doc_id, "doc-a");
1113 }
1114
1115 #[test]
1116 fn limit_zero_or_empty_index_returns_no_hits() {
1117 let path = temp_index_path("limit-zero");
1118 let writer = VectorIndex::create_with_revision(&path, "hash", "test", 4, Quantization::F16)
1119 .expect("writer");
1120 writer.finish().expect("finish");
1121
1122 let index = VectorIndex::open(&path).expect("open index");
1123 let zero_limit = index
1124 .search_top_k(&[1.0, 0.0, 0.0, 0.0], 0, None)
1125 .expect("search");
1126 let empty_index = index
1127 .search_top_k(&[1.0, 0.0, 0.0, 0.0], 5, None)
1128 .expect("search");
1129
1130 assert!(zero_limit.is_empty());
1131 assert!(empty_index.is_empty());
1132 }
1133
1134 #[test]
1135 fn k_above_record_count_returns_all_hits() {
1136 let path = temp_index_path("k-above-count");
1137 write_index(
1138 &path,
1139 &[
1140 ("doc-a", vec![0.1, 0.0, 0.0, 0.0]),
1141 ("doc-b", vec![0.2, 0.0, 0.0, 0.0]),
1142 ],
1143 )
1144 .expect("write index");
1145
1146 let index = VectorIndex::open(&path).expect("open index");
1147 let hits = index
1148 .search_top_k(&[1.0, 0.0, 0.0, 0.0], 20, None)
1149 .expect("search");
1150 assert_eq!(hits.len(), 2);
1151 }
1152
1153 #[test]
1154 fn full_recall_collect_all_matches_heap_prefix_main_only() {
1155 let path = temp_index_path("collect-all-main");
1156 let mut rows = Vec::new();
1157 for i in 0..80 {
1158 let score = f32::from(u16::try_from(80 - i).expect("test index must fit in u16"));
1159 rows.push((format!("doc-{i:03}"), vec![score, 0.0, 0.0, 0.0]));
1160 }
1161 let refs: Vec<(&str, Vec<f32>)> = rows
1162 .iter()
1163 .map(|(doc_id, vec)| (doc_id.as_str(), vec.clone()))
1164 .collect();
1165 write_index(&path, &refs).expect("write index");
1166
1167 let index = VectorIndex::open(&path).expect("open index");
1168 let query = [1.0, 0.0, 0.0, 0.0];
1169 let total = index.record_count();
1170 let heap_limit = total.saturating_sub(7);
1171
1172 let collect_all = index
1173 .search_top_k_internal(&query, total, None, 1, 8, true)
1174 .expect("collect-all");
1175 let heap_top = index
1176 .search_top_k_internal(
1177 &query,
1178 heap_limit,
1179 None,
1180 usize::MAX,
1181 PARALLEL_CHUNK_SIZE,
1182 true,
1183 )
1184 .expect("heap-top");
1185
1186 assert_eq!(collect_all.len(), total);
1187 assert_eq!(heap_top.len(), heap_limit);
1188 for (heap_hit, full_hit) in heap_top.iter().zip(collect_all.iter()) {
1189 assert_eq!(heap_hit.doc_id, full_hit.doc_id);
1190 assert_eq!(heap_hit.index, full_hit.index);
1191 assert!((heap_hit.score - full_hit.score).abs() < 1e-6);
1192 }
1193 }
1194
1195 #[test]
1196 fn full_recall_collect_all_matches_heap_prefix_with_wal() {
1197 let path = temp_index_path("collect-all-wal");
1198 let mut rows = Vec::new();
1199 for i in 0..48 {
1200 let score = f32::from(u16::try_from(48 - i).expect("test index must fit in u16"));
1201 rows.push((format!("doc-{i:03}"), vec![score, 0.0, 0.0, 0.0]));
1202 }
1203 let refs: Vec<(&str, Vec<f32>)> = rows
1204 .iter()
1205 .map(|(doc_id, vec)| (doc_id.as_str(), vec.clone()))
1206 .collect();
1207 write_index(&path, &refs).expect("write index");
1208
1209 let mut index = VectorIndex::open(&path).expect("open index");
1210 index
1211 .append_batch(&[
1212 ("wal-top".to_owned(), vec![200.0, 0.0, 0.0, 0.0]),
1213 ("wal-mid".to_owned(), vec![24.5, 0.0, 0.0, 0.0]),
1214 ("wal-tail".to_owned(), vec![-1.0, 0.0, 0.0, 0.0]),
1215 ])
1216 .expect("append wal batch");
1217
1218 let query = [1.0, 0.0, 0.0, 0.0];
1219 let total = index
1220 .record_count()
1221 .saturating_add(index.wal_record_count());
1222 let heap_limit = total.saturating_sub(5);
1223
1224 let collect_all = index
1225 .search_top_k_internal(&query, total.saturating_add(10), None, 1, 8, true)
1226 .expect("collect-all with wal");
1227 let heap_top = index
1228 .search_top_k_internal(
1229 &query,
1230 heap_limit,
1231 None,
1232 usize::MAX,
1233 PARALLEL_CHUNK_SIZE,
1234 true,
1235 )
1236 .expect("heap-top with wal");
1237
1238 assert_eq!(collect_all.len(), total);
1239 assert_eq!(collect_all[0].doc_id, "wal-top");
1240 assert_eq!(heap_top.len(), heap_limit);
1241 for (heap_hit, full_hit) in heap_top.iter().zip(collect_all.iter()) {
1242 assert_eq!(heap_hit.doc_id, full_hit.doc_id);
1243 assert_eq!(heap_hit.index, full_hit.index);
1244 assert!((heap_hit.score - full_hit.score).abs() < 1e-6);
1245 }
1246 }
1247
1248 #[test]
1249 fn ties_are_broken_by_index() {
1250 let path = temp_index_path("tie-break");
1251 write_index(
1252 &path,
1253 &[
1254 ("doc-a", vec![1.0, 0.0, 0.0, 0.0]),
1255 ("doc-b", vec![1.0, 0.0, 0.0, 0.0]),
1256 ("doc-c", vec![1.0, 0.0, 0.0, 0.0]),
1257 ],
1258 )
1259 .expect("write index");
1260
1261 let index = VectorIndex::open(&path).expect("open index");
1262 let hits = index
1263 .search_top_k(&[1.0, 0.0, 0.0, 0.0], 3, None)
1264 .expect("search");
1265
1266 let mut indexes: Vec<u32> = hits.iter().map(|hit| hit.index).collect();
1267 let mut sorted = indexes.clone();
1268 sorted.sort_unstable();
1269 assert_eq!(indexes, sorted);
1270 assert_eq!(hits.len(), 3);
1271 indexes.clear();
1272 }
1273
1274 #[test]
1275 fn nan_scores_do_not_panic_and_sort_last() {
1276 let path = temp_index_path("nan-safe");
1277 write_index(
1278 &path,
1279 &[
1280 ("doc-a", vec![1.0, 0.0, 0.0, 0.0]),
1281 ("doc-b", vec![0.5, 0.0, 0.0, 0.0]),
1282 ("doc-c", vec![0.2, 0.0, 0.0, 0.0]),
1283 ],
1284 )
1285 .expect("write index");
1286
1287 let index = VectorIndex::open(&path).expect("open index");
1288 let hits = index
1289 .search_top_k(&[f32::NAN, 0.0, 0.0, 0.0], 3, None)
1290 .expect("search");
1291
1292 assert_eq!(hits.len(), 3);
1293 assert!(hits.iter().all(|hit| hit.score.is_nan()));
1294 assert!(hits.windows(2).all(|pair| pair[0].index <= pair[1].index));
1295 }
1296
1297 #[test]
1298 fn parse_parallel_search_env_values() {
1299 assert!(parse_parallel_search_env(None));
1300 assert!(parse_parallel_search_env(Some("1")));
1301 assert!(parse_parallel_search_env(Some("true")));
1302 assert!(parse_parallel_search_env(Some("yes")));
1303 assert!(!parse_parallel_search_env(Some("0")));
1304 assert!(!parse_parallel_search_env(Some("false")));
1305 assert!(!parse_parallel_search_env(Some("no")));
1306 assert!(!parse_parallel_search_env(Some("off")));
1307 }
1308
1309 #[test]
1312 fn bitset_filter_during_vector_search() {
1313 let path = temp_index_path("bitset-filter");
1314 write_index(
1315 &path,
1316 &[
1317 ("doc-a", vec![1.0, 0.0, 0.0, 0.0]),
1318 ("doc-b", vec![0.8, 0.0, 0.0, 0.0]),
1319 ("doc-c", vec![0.2, 0.0, 0.0, 0.0]),
1320 ],
1321 )
1322 .expect("write index");
1323
1324 let index = VectorIndex::open(&path).expect("open index");
1325 let filter = frankensearch_core::BitsetFilter::from_doc_ids(["doc-a", "doc-c"]);
1326 let hits = index
1327 .search_top_k(&[1.0, 0.0, 0.0, 0.0], 10, Some(&filter))
1328 .expect("search");
1329
1330 assert_eq!(hits.len(), 2);
1331 assert_eq!(hits[0].doc_id, "doc-a");
1332 assert_eq!(hits[1].doc_id, "doc-c");
1333 }
1334
1335 #[test]
1336 fn filter_chain_and_semantics_in_search() {
1337 let path = temp_index_path("filter-chain-and");
1338 write_index(
1339 &path,
1340 &[
1341 ("doc-a", vec![1.0, 0.0, 0.0, 0.0]),
1342 ("doc-b", vec![0.8, 0.0, 0.0, 0.0]),
1343 ("doc-c", vec![0.6, 0.0, 0.0, 0.0]),
1344 ],
1345 )
1346 .expect("write index");
1347
1348 let index = VectorIndex::open(&path).expect("open index");
1349 let chain = frankensearch_core::FilterChain::new(frankensearch_core::FilterMode::All)
1350 .with(Box::new(PredicateFilter::new("not-c", |id| id != "doc-c")))
1351 .with(Box::new(PredicateFilter::new("not-a", |id| id != "doc-a")));
1352
1353 let hits = index
1354 .search_top_k(&[1.0, 0.0, 0.0, 0.0], 10, Some(&chain))
1355 .expect("search");
1356
1357 assert_eq!(hits.len(), 1);
1358 assert_eq!(hits[0].doc_id, "doc-b");
1359 }
1360
1361 #[test]
1362 fn filter_chain_or_semantics_in_search() {
1363 let path = temp_index_path("filter-chain-or");
1364 write_index(
1365 &path,
1366 &[
1367 ("doc-a", vec![1.0, 0.0, 0.0, 0.0]),
1368 ("doc-b", vec![0.8, 0.0, 0.0, 0.0]),
1369 ("doc-c", vec![0.6, 0.0, 0.0, 0.0]),
1370 ],
1371 )
1372 .expect("write index");
1373
1374 let index = VectorIndex::open(&path).expect("open index");
1375 let chain = frankensearch_core::FilterChain::new(frankensearch_core::FilterMode::Any)
1376 .with(Box::new(PredicateFilter::new("is-a", |id| id == "doc-a")))
1377 .with(Box::new(PredicateFilter::new("is-c", |id| id == "doc-c")));
1378
1379 let hits = index
1380 .search_top_k(&[1.0, 0.0, 0.0, 0.0], 10, Some(&chain))
1381 .expect("search");
1382
1383 assert_eq!(hits.len(), 2);
1384 assert_eq!(hits[0].doc_id, "doc-a");
1385 assert_eq!(hits[1].doc_id, "doc-c");
1386 }
1387
1388 #[test]
1389 fn filter_rejects_all_returns_empty() {
1390 let path = temp_index_path("filter-all-rejected");
1391 write_index(
1392 &path,
1393 &[
1394 ("doc-a", vec![1.0, 0.0, 0.0, 0.0]),
1395 ("doc-b", vec![0.8, 0.0, 0.0, 0.0]),
1396 ],
1397 )
1398 .expect("write index");
1399
1400 let index = VectorIndex::open(&path).expect("open index");
1401 let filter = PredicateFilter::new("reject-all", |_| false);
1402 let hits = index
1403 .search_top_k(&[1.0, 0.0, 0.0, 0.0], 10, Some(&filter))
1404 .expect("search");
1405
1406 assert!(hits.is_empty());
1407 }
1408
1409 #[test]
1410 fn filter_applies_to_wal_entries() {
1411 let path = temp_index_path("filter-wal");
1412 write_index(&path, &[("doc-a", vec![1.0, 0.0, 0.0, 0.0])]).expect("write index");
1413
1414 let mut index = VectorIndex::open(&path).expect("open index");
1415 index
1416 .append("doc-b", &[0.9, 0.0, 0.0, 0.0])
1417 .expect("append doc-b");
1418 index
1419 .append("doc-c", &[0.8, 0.0, 0.0, 0.0])
1420 .expect("append doc-c");
1421
1422 let filter = PredicateFilter::new("only-b", |id| id == "doc-b");
1424 let hits = index
1425 .search_top_k(&[1.0, 0.0, 0.0, 0.0], 10, Some(&filter))
1426 .expect("search");
1427
1428 assert_eq!(hits.len(), 1);
1429 assert_eq!(hits[0].doc_id, "doc-b");
1430 }
1431
1432 #[test]
1433 fn filter_works_with_wal_and_main_combined() {
1434 let path = temp_index_path("filter-wal-main");
1435 write_index(
1436 &path,
1437 &[
1438 ("doc-a", vec![1.0, 0.0, 0.0, 0.0]),
1439 ("doc-b", vec![0.5, 0.0, 0.0, 0.0]),
1440 ],
1441 )
1442 .expect("write index");
1443
1444 let mut index = VectorIndex::open(&path).expect("open index");
1445 index
1446 .append("doc-c", &[0.9, 0.0, 0.0, 0.0])
1447 .expect("append doc-c");
1448
1449 let filter = frankensearch_core::BitsetFilter::from_doc_ids(["doc-a", "doc-c"]);
1451 let hits = index
1452 .search_top_k(&[1.0, 0.0, 0.0, 0.0], 10, Some(&filter))
1453 .expect("search");
1454
1455 assert_eq!(hits.len(), 2);
1456 assert_eq!(hits[0].doc_id, "doc-a");
1457 assert_eq!(hits[1].doc_id, "doc-c");
1458 }
1459
1460 #[test]
1461 fn all_records_soft_deleted_returns_empty() {
1462 let path = temp_index_path("all-deleted");
1463 write_index(
1464 &path,
1465 &[
1466 ("doc-a", vec![1.0, 0.0, 0.0, 0.0]),
1467 ("doc-b", vec![0.8, 0.0, 0.0, 0.0]),
1468 ("doc-c", vec![0.5, 0.0, 0.0, 0.0]),
1469 ],
1470 )
1471 .expect("write index");
1472
1473 let mut index = VectorIndex::open(&path).expect("open index");
1474 let deleted = index
1475 .soft_delete_batch(&["doc-a", "doc-b", "doc-c"])
1476 .expect("batch delete");
1477 assert_eq!(deleted, 3);
1478
1479 let hits = index
1480 .search_top_k(&[1.0, 0.0, 0.0, 0.0], 10, None)
1481 .expect("search");
1482 assert!(
1483 hits.is_empty(),
1484 "search over fully-deleted index should return empty"
1485 );
1486 }
1487
1488 #[test]
1489 fn dimension_mismatch_returns_error() {
1490 let path = temp_index_path("dim-mismatch");
1491 write_index(&path, &[("doc-a", vec![1.0, 0.0, 0.0, 0.0])]).expect("write index");
1492
1493 let index = VectorIndex::open(&path).expect("open index");
1494 let result = index.search_top_k(&[1.0, 0.0], 5, None);
1495 assert!(matches!(
1496 result,
1497 Err(SearchError::DimensionMismatch {
1498 expected: 4,
1499 found: 2
1500 })
1501 ));
1502 }
1503
1504 #[test]
1505 fn wal_only_search_returns_wal_entries() {
1506 let path = temp_index_path("wal-only");
1507 let writer = VectorIndex::create_with_revision(&path, "hash", "test", 4, Quantization::F16)
1508 .expect("writer");
1509 writer.finish().expect("finish");
1510
1511 let mut index = VectorIndex::open(&path).expect("open index");
1512 assert_eq!(index.record_count(), 0);
1513
1514 index
1515 .append("wal-a", &[1.0, 0.0, 0.0, 0.0])
1516 .expect("append wal-a");
1517 index
1518 .append("wal-b", &[0.5, 0.0, 0.0, 0.0])
1519 .expect("append wal-b");
1520
1521 let hits = index
1522 .search_top_k(&[1.0, 0.0, 0.0, 0.0], 5, None)
1523 .expect("search");
1524
1525 assert_eq!(hits.len(), 2);
1526 assert_eq!(hits[0].doc_id, "wal-a");
1527 assert_eq!(hits[1].doc_id, "wal-b");
1528 assert!(hits[0].score >= hits[1].score);
1529 }
1530
1531 #[test]
1532 fn wal_entries_can_outrank_main_index() {
1533 let path = temp_index_path("wal-outranks-main");
1534 write_index(
1535 &path,
1536 &[
1537 ("main-a", vec![0.3, 0.0, 0.0, 0.0]),
1538 ("main-b", vec![0.2, 0.0, 0.0, 0.0]),
1539 ],
1540 )
1541 .expect("write index");
1542
1543 let mut index = VectorIndex::open(&path).expect("open index");
1544 index
1545 .append("wal-top", &[1.0, 0.0, 0.0, 0.0])
1546 .expect("append wal-top");
1547
1548 let hits = index
1549 .search_top_k(&[1.0, 0.0, 0.0, 0.0], 3, None)
1550 .expect("search");
1551
1552 assert_eq!(hits.len(), 3);
1553 assert_eq!(
1554 hits[0].doc_id, "wal-top",
1555 "WAL entry with highest score should rank first"
1556 );
1557 assert!(hits[0].score >= hits[1].score);
1558 assert!(hits[1].score >= hits[2].score);
1559 }
1560
1561 #[test]
1562 fn stale_main_entry_shadowed_by_wal() {
1563 let path = temp_index_path("stale-shadow");
1564 let mut writer =
1566 VectorIndex::create_with_revision(&path, "test", "r1", 2, Quantization::F32).unwrap();
1567 writer.write_record("doc-a", &[1.0, 0.0]).unwrap();
1568 writer.finish().unwrap();
1569
1570 let mut index = VectorIndex::open(&path).unwrap();
1571 index.append("doc-a", &[0.0, 1.0]).unwrap();
1573
1574 let hits = index.search_top_k(&[1.0, 0.0], 1, None).unwrap();
1579
1580 assert_eq!(hits.len(), 1);
1581 assert!(
1582 hits[0].score.abs() < f32::EPSILON,
1583 "Expected score 0.0 from WAL entry, but got leaked score {}",
1584 hits[0].score
1585 );
1586 }
1587
1588 #[test]
1589 fn wal_index_marker_out_of_bounds_returns_error() {
1590 let path = temp_index_path("wal-oob-index-marker");
1591 write_index(&path, &[("main-a", vec![1.0, 0.0, 0.0, 0.0])]).expect("write index");
1592
1593 let index = VectorIndex::open(&path).expect("open index");
1594 let fabricated = HeapEntry::new(to_wal_index(42), 1.0);
1595 let err = index
1596 .resolve_wal_hit(&fabricated)
1597 .expect_err("fabricated WAL marker should fail bounds check");
1598 assert!(matches!(err, SearchError::IndexCorrupted { .. }));
1599 }
1600
1601 #[test]
1602 fn heap_entry_nan_sorted_below_finite_scores() {
1603 let nan_entry = HeapEntry::new(0, f32::NAN);
1604 let finite_entry = HeapEntry::new(1, 0.5);
1605 assert!(candidate_is_better(finite_entry, nan_entry));
1607 assert!(!candidate_is_better(nan_entry, finite_entry));
1608 }
1609
1610 #[test]
1611 fn heap_entry_equal_scores_tiebreak_by_index() {
1612 let left = HeapEntry::new(3, 0.5);
1613 let right = HeapEntry::new(7, 0.5);
1614 assert!(candidate_is_better(left, right));
1616 assert!(!candidate_is_better(right, left));
1617 }
1618
1619 #[test]
1620 fn insert_candidate_with_limit_zero_is_noop() {
1621 let mut heap = BinaryHeap::new();
1622 insert_candidate(&mut heap, HeapEntry::new(0, 0.9), 0);
1623 assert!(heap.is_empty());
1624 }
1625
1626 #[test]
1627 fn insert_candidate_evicts_worst_when_full() {
1628 let mut heap = BinaryHeap::new();
1629 insert_candidate(&mut heap, HeapEntry::new(0, 0.1), 2);
1630 insert_candidate(&mut heap, HeapEntry::new(1, 0.5), 2);
1631 insert_candidate(&mut heap, HeapEntry::new(2, 0.9), 2);
1633
1634 let entries: Vec<HeapEntry> = heap.into_vec();
1635 assert_eq!(entries.len(), 2);
1636 let scores: Vec<f32> = entries.iter().map(|e| e.score).collect();
1637 assert!(scores.contains(&0.9));
1639 assert!(scores.contains(&0.5));
1640 assert!(!scores.contains(&0.1));
1641 }
1642
1643 #[test]
1644 fn insert_candidate_rejects_worse_when_full() {
1645 let mut heap = BinaryHeap::new();
1646 insert_candidate(&mut heap, HeapEntry::new(0, 0.5), 2);
1647 insert_candidate(&mut heap, HeapEntry::new(1, 0.9), 2);
1648 insert_candidate(&mut heap, HeapEntry::new(2, 0.1), 2);
1650
1651 let entries: Vec<HeapEntry> = heap.into_vec();
1652 assert_eq!(entries.len(), 2);
1653 let scores: Vec<f32> = entries.iter().map(|e| e.score).collect();
1654 assert!(scores.contains(&0.9));
1655 assert!(scores.contains(&0.5));
1656 assert!(!scores.contains(&0.1));
1657 }
1658
1659 #[test]
1660 fn merge_partial_heaps_preserves_top_k() {
1661 let mut heap_a = BinaryHeap::new();
1662 insert_candidate(&mut heap_a, HeapEntry::new(0, 0.9), 3);
1663 insert_candidate(&mut heap_a, HeapEntry::new(1, 0.1), 3);
1664
1665 let mut heap_b = BinaryHeap::new();
1666 insert_candidate(&mut heap_b, HeapEntry::new(2, 0.7), 3);
1667 insert_candidate(&mut heap_b, HeapEntry::new(3, 0.5), 3);
1668
1669 let merged = merge_partial_heaps(vec![heap_a, heap_b], 3);
1670 let entries: Vec<HeapEntry> = merged.into_vec();
1671 assert_eq!(entries.len(), 3);
1672 let scores: Vec<f32> = entries.iter().map(|e| e.score).collect();
1673 assert!(scores.contains(&0.9));
1675 assert!(scores.contains(&0.7));
1676 assert!(scores.contains(&0.5));
1677 assert!(!scores.contains(&0.1));
1678 }
1679
1680 #[test]
1681 fn parse_parallel_search_env_case_insensitive() {
1682 assert!(!parse_parallel_search_env(Some("OFF")));
1683 assert!(!parse_parallel_search_env(Some("False")));
1684 assert!(!parse_parallel_search_env(Some("NO")));
1685 assert!(!parse_parallel_search_env(Some(" off ")));
1686 }
1687
1688 #[test]
1689 fn parse_parallel_search_env_empty_string_enables() {
1690 assert!(parse_parallel_search_env(Some("")));
1692 assert!(parse_parallel_search_env(Some(" ")));
1693 }
1694
1695 #[test]
1696 fn parallel_filter_path_propagates_doc_id_errors() {
1697 let path = temp_index_path("parallel-filter-errors");
1698 write_index(
1699 &path,
1700 &[("doc-a", vec![1.0, 0.0]), ("doc-b", vec![0.0, 1.0])],
1701 )
1702 .expect("write index");
1703
1704 let inspect = VectorIndex::open(&path).expect("open index");
1705 let bad_idx = inspect
1706 .find_index_by_doc_hash(super::super::fnv1a_hash(b"doc-b"))
1707 .expect("doc-b index");
1708 let record = inspect.record_at(bad_idx).expect("record");
1709 let bad_offset =
1710 inspect.strings_offset + usize::try_from(record.doc_id_offset).unwrap_or(0);
1711 drop(inspect);
1712
1713 let mut bytes = fs::read(&path).expect("read index bytes");
1714 bytes[bad_offset] = 0xFF;
1715 fs::write(&path, bytes).expect("write corrupt bytes");
1716
1717 let index = VectorIndex::open(&path).expect("reopen index");
1718 let filter = PredicateFilter::new("allow-all", |_| true);
1719 let query = [1.0, 0.0];
1720
1721 let sequential = index.search_top_k_internal(&query, 1, Some(&filter), usize::MAX, 2, true);
1722 let parallel = index.search_top_k_internal(&query, 1, Some(&filter), 1, 2, true);
1723
1724 assert!(
1725 sequential.is_err(),
1726 "sequential path should surface doc_id errors"
1727 );
1728 assert!(
1729 parallel.is_err(),
1730 "parallel path should surface doc_id errors"
1731 );
1732 }
1733
1734 #[test]
1737 fn search_params_default_matches_constants() {
1738 let params = SearchParams::default();
1739 assert_eq!(params.parallel_threshold, PARALLEL_THRESHOLD);
1740 assert_eq!(params.parallel_chunk_size, PARALLEL_CHUNK_SIZE);
1741 }
1742
1743 #[test]
1744 fn search_top_k_with_params_matches_default_search() {
1745 let path = temp_index_path("with-params-default");
1746 let mut rows = Vec::new();
1747 for i in 0..32 {
1748 let rank = f32::from(u16::try_from(i).expect("fits u16"));
1749 rows.push((format!("doc-{i:03}"), vec![rank, 0.0, 0.0, 0.0]));
1750 }
1751 let refs: Vec<(&str, Vec<f32>)> = rows
1752 .iter()
1753 .map(|(doc_id, vec)| (doc_id.as_str(), vec.clone()))
1754 .collect();
1755 write_index(&path, &refs).expect("write index");
1756
1757 let index = VectorIndex::open(&path).expect("open index");
1758 let query = [1.0, 0.0, 0.0, 0.0];
1759
1760 let default_hits = index.search_top_k(&query, 5, None).expect("default search");
1761 let params_hits = index
1762 .search_top_k_with_params(&query, 5, None, SearchParams::default())
1763 .expect("params search");
1764
1765 assert_eq!(default_hits.len(), params_hits.len());
1766 for (left, right) in default_hits.iter().zip(params_hits.iter()) {
1767 assert_eq!(left.doc_id, right.doc_id);
1768 assert_eq!(left.index, right.index);
1769 assert!((left.score - right.score).abs() < 1e-6);
1770 }
1771 }
1772
1773 #[test]
1774 fn search_top_k_with_params_custom_threshold() {
1775 let path = temp_index_path("with-params-custom");
1776 let mut rows = Vec::new();
1777 for i in 0..64 {
1778 let rank = f32::from(u16::try_from(i).expect("fits u16"));
1779 rows.push((
1780 format!("doc-{i:03}"),
1781 vec![rank, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
1782 ));
1783 }
1784 let refs: Vec<(&str, Vec<f32>)> = rows
1785 .iter()
1786 .map(|(doc_id, vec)| (doc_id.as_str(), vec.clone()))
1787 .collect();
1788 write_index(&path, &refs).expect("write index");
1789
1790 let index = VectorIndex::open(&path).expect("open index");
1791 let query = [1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0];
1792 let filter = PredicateFilter::new("even-docs", |doc_id| {
1793 let suffix = doc_id.strip_prefix("doc-").unwrap_or_default();
1794 suffix.parse::<u32>().is_ok_and(|v| v % 2 == 0)
1795 });
1796
1797 let sequential = index
1798 .search_top_k_internal(
1799 &query,
1800 10,
1801 Some(&filter),
1802 usize::MAX,
1803 PARALLEL_CHUNK_SIZE,
1804 true,
1805 )
1806 .expect("sequential search");
1807 let parallel = index
1808 .search_top_k_internal(&query, 10, Some(&filter), 1, 8, true)
1809 .expect("parallel search");
1810
1811 assert_eq!(sequential.len(), parallel.len());
1812 for (left, right) in sequential.iter().zip(parallel.iter()) {
1813 assert_eq!(left.doc_id, right.doc_id);
1814 assert!((left.score - right.score).abs() < 1e-6);
1815 }
1816 }
1817
1818 #[test]
1819 fn search_top_k_with_params_disabled_parallel() {
1820 let path = temp_index_path("with-params-disabled");
1821 write_index(
1822 &path,
1823 &[
1824 ("doc-a", vec![1.0, 0.0, 0.0, 0.0]),
1825 ("doc-b", vec![0.8, 0.0, 0.0, 0.0]),
1826 ],
1827 )
1828 .expect("write index");
1829
1830 let index = VectorIndex::open(&path).expect("open index");
1831 let query = [1.0, 0.0, 0.0, 0.0];
1832
1833 let params = SearchParams {
1834 parallel_threshold: 1, parallel_chunk_size: 1,
1836 parallel_enabled: false, };
1838 let hits = index
1839 .search_top_k_with_params(&query, 2, None, params)
1840 .expect("search with disabled parallel");
1841 assert_eq!(hits.len(), 2);
1842 assert_eq!(hits[0].doc_id, "doc-a");
1843 assert_eq!(hits[1].doc_id, "doc-b");
1844 }
1845
1846 #[test]
1849 fn search_params_debug_clone_copy() {
1850 let params = SearchParams {
1851 parallel_threshold: 100,
1852 parallel_chunk_size: 32,
1853 parallel_enabled: true,
1854 };
1855 let debug = format!("{params:?}");
1856 assert!(debug.contains("SearchParams"));
1857 assert!(debug.contains("100"));
1858
1859 let copied: SearchParams = params;
1860 assert_eq!(copied.parallel_threshold, 100);
1861 assert_eq!(copied.parallel_chunk_size, 32);
1862
1863 let cloned = params;
1864 assert!(cloned.parallel_enabled);
1865 }
1866
1867 #[test]
1868 fn compare_best_first_higher_score_wins() {
1869 let a = HeapEntry::new(0, 0.9);
1870 let b = HeapEntry::new(1, 0.5);
1871 assert_eq!(compare_best_first(&a, &b), Ordering::Less);
1872 assert_eq!(compare_best_first(&b, &a), Ordering::Greater);
1873 }
1874
1875 #[test]
1876 fn compare_best_first_equal_scores_tiebreak_by_index() {
1877 let a = HeapEntry::new(2, 0.7);
1878 let b = HeapEntry::new(5, 0.7);
1879 assert_eq!(compare_best_first(&a, &b), Ordering::Less);
1880 }
1881
1882 #[test]
1883 fn candidate_is_better_with_nan() {
1884 let good = HeapEntry::new(0, 0.5);
1885 let nan_entry = HeapEntry::new(1, f32::NAN);
1886 assert!(candidate_is_better(good, nan_entry));
1887 assert!(!candidate_is_better(nan_entry, good));
1888 }
1889
1890 #[test]
1891 fn candidate_is_better_equal_scores_lower_index_wins() {
1892 let a = HeapEntry::new(3, 0.8);
1893 let b = HeapEntry::new(7, 0.8);
1894 assert!(candidate_is_better(a, b));
1895 assert!(!candidate_is_better(b, a));
1896 }
1897
1898 #[test]
1899 fn score_key_maps_nan_to_neg_infinity() {
1900 assert_eq!(score_key(f32::NAN).to_bits(), f32::NEG_INFINITY.to_bits());
1901 assert!((score_key(0.5) - 0.5).abs() < f32::EPSILON);
1902 assert!((score_key(-0.3) - (-0.3)).abs() < f32::EPSILON);
1903 assert!((score_key(0.0)).abs() < f32::EPSILON);
1904 }
1905
1906 #[test]
1907 fn merge_partial_heaps_empty_list() {
1908 let merged = merge_partial_heaps(vec![], 10);
1909 assert!(merged.is_empty());
1910 }
1911
1912 #[test]
1913 fn merge_partial_heaps_single_heap() {
1914 let mut h = BinaryHeap::new();
1915 h.push(HeapEntry::new(0, 0.9));
1916 h.push(HeapEntry::new(1, 0.5));
1917 let merged = merge_partial_heaps(vec![h], 10);
1918 assert_eq!(merged.len(), 2);
1919 }
1920
1921 #[test]
1922 fn search_f32_quantization_index() {
1923 let path = temp_index_path("f32-quant-search");
1924 let dim = 4;
1925 let mut writer =
1926 VectorIndex::create_with_revision(&path, "test", "r1", dim, Quantization::F32).unwrap();
1927 writer.write_record("doc-a", &[1.0, 0.0, 0.0, 0.0]).unwrap();
1928 writer.write_record("doc-b", &[0.0, 1.0, 0.0, 0.0]).unwrap();
1929 writer.write_record("doc-c", &[0.5, 0.5, 0.0, 0.0]).unwrap();
1930 writer.finish().unwrap();
1931
1932 let index = VectorIndex::open(&path).unwrap();
1933 assert_eq!(index.quantization(), Quantization::F32);
1934
1935 let query = [1.0, 0.0, 0.0, 0.0];
1936 let hits = index.search_top_k(&query, 2, None).unwrap();
1937 assert_eq!(hits.len(), 2);
1938 assert_eq!(hits[0].doc_id, "doc-a");
1939
1940 fs::remove_file(&path).ok();
1941 }
1942
1943 #[test]
1944 fn search_f32_with_filter() {
1945 let path = temp_index_path("f32-filter");
1946 let dim = 4;
1947 let mut writer =
1948 VectorIndex::create_with_revision(&path, "test", "r1", dim, Quantization::F32).unwrap();
1949 writer.write_record("doc-a", &[1.0, 0.0, 0.0, 0.0]).unwrap();
1950 writer.write_record("doc-b", &[0.9, 0.1, 0.0, 0.0]).unwrap();
1951 writer.finish().unwrap();
1952
1953 let index = VectorIndex::open(&path).unwrap();
1954 let filter = PredicateFilter::new("only-b", |doc_id: &str| doc_id == "doc-b");
1955 let query = [1.0, 0.0, 0.0, 0.0];
1956 let hits = index.search_top_k(&query, 10, Some(&filter)).unwrap();
1957 assert_eq!(hits.len(), 1);
1958 assert_eq!(hits[0].doc_id, "doc-b");
1959
1960 fs::remove_file(&path).ok();
1961 }
1962
1963 }