1#[cfg(feature = "native")]
11use crate::dsl::Schema;
12#[cfg(feature = "native")]
13use crate::error::Result;
14#[cfg(feature = "native")]
15use crate::structures::{CoarseCentroids, PQCodebook};
16#[cfg(feature = "native")]
17use rustc_hash::FxHashMap;
18#[cfg(feature = "native")]
19use std::sync::Arc;
20
21mod searcher;
22pub use searcher::Searcher;
23
24#[cfg(feature = "native")]
25mod reader;
26#[cfg(feature = "native")]
27mod vector_builder;
28#[cfg(feature = "native")]
29mod writer;
30#[cfg(feature = "native")]
31pub use reader::IndexReader;
32#[cfg(feature = "native")]
33pub use writer::IndexWriter;
34
35mod metadata;
36pub use metadata::{FieldVectorMeta, INDEX_META_FILENAME, IndexMetadata, VectorIndexState};
37
38#[cfg(feature = "native")]
39mod helpers;
40#[cfg(feature = "native")]
41pub use helpers::{
42 IndexingStats, SchemaConfig, SchemaFieldConfig, create_index_at_path, create_index_from_sdl,
43 index_documents_from_reader, index_json_document, parse_schema,
44};
45
46pub const SLICE_CACHE_FILENAME: &str = "index.slicecache";
48
49#[derive(Debug, Clone)]
51pub struct IndexConfig {
52 pub num_threads: usize,
54 pub num_indexing_threads: usize,
56 pub num_compression_threads: usize,
58 pub term_cache_blocks: usize,
60 pub store_cache_blocks: usize,
62 pub max_indexing_memory_bytes: usize,
64 pub merge_policy: Box<dyn crate::merge::MergePolicy>,
66 pub optimization: crate::structures::IndexOptimization,
68 pub reload_interval_ms: u64,
70}
71
72impl Default for IndexConfig {
73 fn default() -> Self {
74 #[cfg(feature = "native")]
75 let cpus = num_cpus::get().max(1);
76 #[cfg(not(feature = "native"))]
77 let cpus = 1;
78
79 Self {
80 num_threads: cpus,
81 num_indexing_threads: 1,
82 num_compression_threads: cpus,
83 term_cache_blocks: 256,
84 store_cache_blocks: 32,
85 max_indexing_memory_bytes: 256 * 1024 * 1024, merge_policy: Box::new(crate::merge::TieredMergePolicy::default()),
87 optimization: crate::structures::IndexOptimization::default(),
88 reload_interval_ms: 1000, }
90 }
91}
92
93#[cfg(feature = "native")]
102pub struct Index<D: crate::directories::DirectoryWriter + 'static> {
103 directory: Arc<D>,
104 schema: Arc<Schema>,
105 config: IndexConfig,
106 segment_manager: Arc<crate::merge::SegmentManager<D>>,
108 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
110 trained_codebooks: FxHashMap<u32, Arc<PQCodebook>>,
112 cached_reader: tokio::sync::OnceCell<IndexReader<D>>,
114}
115
116#[cfg(feature = "native")]
117impl<D: crate::directories::DirectoryWriter + 'static> Index<D> {
118 pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
120 let directory = Arc::new(directory);
121 let schema = Arc::new(schema);
122 let metadata = IndexMetadata::new((*schema).clone());
123
124 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
125 Arc::clone(&directory),
126 Arc::clone(&schema),
127 metadata,
128 config.merge_policy.clone_box(),
129 config.term_cache_blocks,
130 ));
131
132 segment_manager.update_metadata(|_| {}).await?;
134
135 Ok(Self {
136 directory,
137 schema,
138 config,
139 segment_manager,
140 trained_centroids: FxHashMap::default(),
141 trained_codebooks: FxHashMap::default(),
142 cached_reader: tokio::sync::OnceCell::new(),
143 })
144 }
145
146 pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
148 let directory = Arc::new(directory);
149
150 let metadata = IndexMetadata::load(directory.as_ref()).await?;
152 let schema = Arc::new(metadata.schema.clone());
153
154 let trained = metadata.load_trained_structures(directory.as_ref()).await;
156 let trained_centroids = trained
157 .as_ref()
158 .map(|t| t.centroids.clone())
159 .unwrap_or_default();
160 let trained_codebooks = trained
161 .as_ref()
162 .map(|t| t.codebooks.clone())
163 .unwrap_or_default();
164
165 log::debug!(
166 "[Index::open] trained_centroids fields={:?}, trained_codebooks fields={:?}",
167 trained_centroids.keys().collect::<Vec<_>>(),
168 trained_codebooks.keys().collect::<Vec<_>>(),
169 );
170
171 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
172 Arc::clone(&directory),
173 Arc::clone(&schema),
174 metadata,
175 config.merge_policy.clone_box(),
176 config.term_cache_blocks,
177 ));
178
179 Ok(Self {
180 directory,
181 schema,
182 config,
183 segment_manager,
184 trained_centroids,
185 trained_codebooks,
186 cached_reader: tokio::sync::OnceCell::new(),
187 })
188 }
189
190 pub fn schema(&self) -> &Schema {
192 &self.schema
193 }
194
195 pub fn directory(&self) -> &D {
197 &self.directory
198 }
199
200 pub fn segment_manager(&self) -> &Arc<crate::merge::SegmentManager<D>> {
202 &self.segment_manager
203 }
204
205 pub async fn reader(&self) -> Result<&IndexReader<D>> {
210 self.cached_reader
211 .get_or_try_init(|| async {
212 IndexReader::from_segment_manager(
213 Arc::clone(&self.schema),
214 Arc::clone(&self.segment_manager),
215 self.config.term_cache_blocks,
216 self.config.reload_interval_ms,
217 )
218 .await
219 })
220 .await
221 }
222
223 pub fn config(&self) -> &IndexConfig {
225 &self.config
226 }
227
228 pub fn trained_centroids(&self) -> &FxHashMap<u32, Arc<CoarseCentroids>> {
230 &self.trained_centroids
231 }
232
233 pub fn trained_codebooks(&self) -> &FxHashMap<u32, Arc<PQCodebook>> {
235 &self.trained_codebooks
236 }
237
238 pub async fn segment_readers(&self) -> Result<Vec<Arc<crate::segment::SegmentReader>>> {
240 let reader = self.reader().await?;
241 let searcher = reader.searcher().await?;
242 Ok(searcher.segment_readers().to_vec())
243 }
244
245 pub async fn num_docs(&self) -> Result<u32> {
247 let reader = self.reader().await?;
248 let searcher = reader.searcher().await?;
249 Ok(searcher.num_docs())
250 }
251
252 pub fn default_fields(&self) -> Vec<crate::Field> {
254 if !self.schema.default_fields().is_empty() {
255 self.schema.default_fields().to_vec()
256 } else {
257 self.schema
258 .fields()
259 .filter(|(_, entry)| {
260 entry.indexed && entry.field_type == crate::dsl::FieldType::Text
261 })
262 .map(|(field, _)| field)
263 .collect()
264 }
265 }
266
267 pub fn tokenizers(&self) -> Arc<crate::tokenizer::TokenizerRegistry> {
269 Arc::new(crate::tokenizer::TokenizerRegistry::default())
270 }
271
272 pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
274 let default_fields = self.default_fields();
275 let tokenizers = self.tokenizers();
276
277 let query_routers = self.schema.query_routers();
278 if !query_routers.is_empty()
279 && let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers)
280 {
281 return crate::dsl::QueryLanguageParser::with_router(
282 Arc::clone(&self.schema),
283 default_fields,
284 tokenizers,
285 router,
286 );
287 }
288
289 crate::dsl::QueryLanguageParser::new(Arc::clone(&self.schema), default_fields, tokenizers)
290 }
291
292 pub async fn query(
294 &self,
295 query_str: &str,
296 limit: usize,
297 ) -> Result<crate::query::SearchResponse> {
298 self.query_offset(query_str, limit, 0).await
299 }
300
301 pub async fn query_offset(
303 &self,
304 query_str: &str,
305 limit: usize,
306 offset: usize,
307 ) -> Result<crate::query::SearchResponse> {
308 let parser = self.query_parser();
309 let query = parser
310 .parse(query_str)
311 .map_err(crate::error::Error::Query)?;
312 self.search_offset(query.as_ref(), limit, offset).await
313 }
314
315 pub async fn search(
317 &self,
318 query: &dyn crate::query::Query,
319 limit: usize,
320 ) -> Result<crate::query::SearchResponse> {
321 self.search_offset(query, limit, 0).await
322 }
323
324 pub async fn search_offset(
326 &self,
327 query: &dyn crate::query::Query,
328 limit: usize,
329 offset: usize,
330 ) -> Result<crate::query::SearchResponse> {
331 let reader = self.reader().await?;
332 let searcher = reader.searcher().await?;
333 let segments = searcher.segment_readers();
334
335 let fetch_limit = offset + limit;
336
337 let futures: Vec<_> = segments
338 .iter()
339 .map(|segment| {
340 let sid = segment.meta().id;
341 async move {
342 let results =
343 crate::query::search_segment(segment.as_ref(), query, fetch_limit).await?;
344 Ok::<_, crate::error::Error>(
345 results
346 .into_iter()
347 .map(move |r| (sid, r))
348 .collect::<Vec<_>>(),
349 )
350 }
351 })
352 .collect();
353
354 let batches = futures::future::try_join_all(futures).await?;
355 let mut all_results: Vec<(u128, crate::query::SearchResult)> =
356 Vec::with_capacity(batches.iter().map(|b| b.len()).sum());
357 for batch in batches {
358 all_results.extend(batch);
359 }
360
361 all_results.sort_by(|a, b| {
362 b.1.score
363 .partial_cmp(&a.1.score)
364 .unwrap_or(std::cmp::Ordering::Equal)
365 });
366
367 let total_hits = all_results.len() as u32;
368
369 let hits: Vec<crate::query::SearchHit> = all_results
370 .into_iter()
371 .skip(offset)
372 .take(limit)
373 .map(|(segment_id, result)| crate::query::SearchHit {
374 address: crate::query::DocAddress::new(segment_id, result.doc_id),
375 score: result.score,
376 matched_fields: result.extract_ordinals(),
377 })
378 .collect();
379
380 Ok(crate::query::SearchResponse { hits, total_hits })
381 }
382
383 pub async fn get_document(
385 &self,
386 address: &crate::query::DocAddress,
387 ) -> Result<Option<crate::dsl::Document>> {
388 let reader = self.reader().await?;
389 let searcher = reader.searcher().await?;
390 searcher.get_document(address).await
391 }
392
393 pub async fn reload(&self) -> Result<()> {
395 Ok(())
397 }
398
399 pub async fn get_postings(
401 &self,
402 field: crate::Field,
403 term: &[u8],
404 ) -> Result<
405 Vec<(
406 Arc<crate::segment::SegmentReader>,
407 crate::structures::BlockPostingList,
408 )>,
409 > {
410 let segments = self.segment_readers().await?;
411 let mut results = Vec::new();
412
413 for segment in segments {
414 if let Some(postings) = segment.get_postings(field, term).await? {
415 results.push((segment, postings));
416 }
417 }
418
419 Ok(results)
420 }
421}
422
423#[cfg(feature = "native")]
425impl<D: crate::directories::DirectoryWriter + 'static> Index<D> {
426 pub fn writer(&self) -> writer::IndexWriter<D> {
428 writer::IndexWriter::from_index(self)
429 }
430}
431
432#[cfg(test)]
435mod tests {
436 use super::*;
437 use crate::directories::RamDirectory;
438 use crate::dsl::{Document, SchemaBuilder};
439
440 #[tokio::test]
441 async fn test_index_create_and_search() {
442 let mut schema_builder = SchemaBuilder::default();
443 let title = schema_builder.add_text_field("title", true, true);
444 let body = schema_builder.add_text_field("body", true, true);
445 let schema = schema_builder.build();
446
447 let dir = RamDirectory::new();
448 let config = IndexConfig::default();
449
450 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
452 .await
453 .unwrap();
454
455 let mut doc1 = Document::new();
456 doc1.add_text(title, "Hello World");
457 doc1.add_text(body, "This is the first document");
458 writer.add_document(doc1).unwrap();
459
460 let mut doc2 = Document::new();
461 doc2.add_text(title, "Goodbye World");
462 doc2.add_text(body, "This is the second document");
463 writer.add_document(doc2).unwrap();
464
465 writer.commit().await.unwrap();
466
467 let index = Index::open(dir, config).await.unwrap();
469 assert_eq!(index.num_docs().await.unwrap(), 2);
470
471 let postings = index.get_postings(title, b"world").await.unwrap();
473 assert_eq!(postings.len(), 1); assert_eq!(postings[0].1.doc_count(), 2); let reader = index.reader().await.unwrap();
478 let searcher = reader.searcher().await.unwrap();
479 let doc = searcher.doc(0).await.unwrap().unwrap();
480 assert_eq!(doc.get_first(title).unwrap().as_text(), Some("Hello World"));
481 }
482
483 #[tokio::test]
484 async fn test_multiple_segments() {
485 let mut schema_builder = SchemaBuilder::default();
486 let title = schema_builder.add_text_field("title", true, true);
487 let schema = schema_builder.build();
488
489 let dir = RamDirectory::new();
490 let config = IndexConfig {
491 max_indexing_memory_bytes: 1024, ..Default::default()
493 };
494
495 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
496 .await
497 .unwrap();
498
499 for batch in 0..3 {
501 for i in 0..5 {
502 let mut doc = Document::new();
503 doc.add_text(title, format!("Document {} batch {}", i, batch));
504 writer.add_document(doc).unwrap();
505 }
506 writer.commit().await.unwrap();
507 }
508
509 let index = Index::open(dir, config).await.unwrap();
511 assert_eq!(index.num_docs().await.unwrap(), 15);
512 assert!(
514 index.segment_readers().await.unwrap().len() >= 2,
515 "Expected multiple segments"
516 );
517 }
518
519 #[tokio::test]
520 async fn test_segment_merge() {
521 let mut schema_builder = SchemaBuilder::default();
522 let title = schema_builder.add_text_field("title", true, true);
523 let schema = schema_builder.build();
524
525 let dir = RamDirectory::new();
526 let config = IndexConfig {
527 max_indexing_memory_bytes: 512, ..Default::default()
529 };
530
531 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
532 .await
533 .unwrap();
534
535 for batch in 0..3 {
537 for i in 0..3 {
538 let mut doc = Document::new();
539 doc.add_text(title, format!("Document {} batch {}", i, batch));
540 writer.add_document(doc).unwrap();
541 }
542 writer.flush().await.unwrap();
543 }
544 writer.commit().await.unwrap();
545
546 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
548 assert!(
549 index.segment_readers().await.unwrap().len() >= 2,
550 "Expected multiple segments"
551 );
552
553 let writer = IndexWriter::open(dir.clone(), config.clone())
555 .await
556 .unwrap();
557 writer.force_merge().await.unwrap();
558
559 let index = Index::open(dir, config).await.unwrap();
561 assert_eq!(index.segment_readers().await.unwrap().len(), 1);
562 assert_eq!(index.num_docs().await.unwrap(), 9);
563
564 let reader = index.reader().await.unwrap();
566 let searcher = reader.searcher().await.unwrap();
567 let mut found_docs = 0;
568 for i in 0..9 {
569 if searcher.doc(i).await.unwrap().is_some() {
570 found_docs += 1;
571 }
572 }
573 assert_eq!(found_docs, 9);
574 }
575
576 #[tokio::test]
577 async fn test_match_query() {
578 let mut schema_builder = SchemaBuilder::default();
579 let title = schema_builder.add_text_field("title", true, true);
580 let body = schema_builder.add_text_field("body", true, true);
581 let schema = schema_builder.build();
582
583 let dir = RamDirectory::new();
584 let config = IndexConfig::default();
585
586 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
587 .await
588 .unwrap();
589
590 let mut doc1 = Document::new();
591 doc1.add_text(title, "rust programming");
592 doc1.add_text(body, "Learn rust language");
593 writer.add_document(doc1).unwrap();
594
595 let mut doc2 = Document::new();
596 doc2.add_text(title, "python programming");
597 doc2.add_text(body, "Learn python language");
598 writer.add_document(doc2).unwrap();
599
600 writer.commit().await.unwrap();
601
602 let index = Index::open(dir, config).await.unwrap();
603
604 let results = index.query("rust", 10).await.unwrap();
606 assert_eq!(results.hits.len(), 1);
607
608 let results = index.query("rust programming", 10).await.unwrap();
610 assert!(!results.hits.is_empty());
611
612 let hit = &results.hits[0];
614 assert!(!hit.address.segment_id.is_empty(), "Should have segment_id");
615
616 let doc = index.get_document(&hit.address).await.unwrap().unwrap();
618 assert!(
619 !doc.field_values().is_empty(),
620 "Doc should have field values"
621 );
622
623 let reader = index.reader().await.unwrap();
625 let searcher = reader.searcher().await.unwrap();
626 let doc = searcher.doc(0).await.unwrap().unwrap();
627 assert!(
628 !doc.field_values().is_empty(),
629 "Doc should have field values"
630 );
631 }
632
633 #[tokio::test]
634 async fn test_slice_cache_warmup_and_load() {
635 use crate::directories::SliceCachingDirectory;
636
637 let mut schema_builder = SchemaBuilder::default();
638 let title = schema_builder.add_text_field("title", true, true);
639 let body = schema_builder.add_text_field("body", true, true);
640 let schema = schema_builder.build();
641
642 let dir = RamDirectory::new();
643 let config = IndexConfig::default();
644
645 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
647 .await
648 .unwrap();
649
650 for i in 0..10 {
651 let mut doc = Document::new();
652 doc.add_text(title, format!("Document {} about rust", i));
653 doc.add_text(body, format!("This is body text number {}", i));
654 writer.add_document(doc).unwrap();
655 }
656 writer.commit().await.unwrap();
657
658 let caching_dir = SliceCachingDirectory::new(dir.clone(), 1024 * 1024);
660 let index = Index::open(caching_dir, config.clone()).await.unwrap();
661
662 let results = index.query("rust", 10).await.unwrap();
664 assert!(!results.hits.is_empty());
665
666 let stats = index.directory.stats();
668 assert!(stats.total_bytes > 0, "Cache should have data after search");
669 }
670
671 #[tokio::test]
672 async fn test_multivalue_field_indexing_and_search() {
673 let mut schema_builder = SchemaBuilder::default();
674 let uris = schema_builder.add_text_field("uris", true, true);
675 let title = schema_builder.add_text_field("title", true, true);
676 let schema = schema_builder.build();
677
678 let dir = RamDirectory::new();
679 let config = IndexConfig::default();
680
681 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
683 .await
684 .unwrap();
685
686 let mut doc = Document::new();
687 doc.add_text(uris, "one");
688 doc.add_text(uris, "two");
689 doc.add_text(title, "Test Document");
690 writer.add_document(doc).unwrap();
691
692 let mut doc2 = Document::new();
694 doc2.add_text(uris, "three");
695 doc2.add_text(title, "Another Document");
696 writer.add_document(doc2).unwrap();
697
698 writer.commit().await.unwrap();
699
700 let index = Index::open(dir, config).await.unwrap();
702 assert_eq!(index.num_docs().await.unwrap(), 2);
703
704 let reader = index.reader().await.unwrap();
706 let searcher = reader.searcher().await.unwrap();
707 let doc = searcher.doc(0).await.unwrap().unwrap();
708 let all_uris: Vec<_> = doc.get_all(uris).collect();
709 assert_eq!(all_uris.len(), 2, "Should have 2 uris values");
710 assert_eq!(all_uris[0].as_text(), Some("one"));
711 assert_eq!(all_uris[1].as_text(), Some("two"));
712
713 let json = doc.to_json(index.schema());
715 let uris_json = json.get("uris").unwrap();
716 assert!(uris_json.is_array(), "Multi-value field should be an array");
717 let uris_arr = uris_json.as_array().unwrap();
718 assert_eq!(uris_arr.len(), 2);
719 assert_eq!(uris_arr[0].as_str(), Some("one"));
720 assert_eq!(uris_arr[1].as_str(), Some("two"));
721
722 let results = index.query("uris:one", 10).await.unwrap();
724 assert_eq!(results.hits.len(), 1, "Should find doc with 'one'");
725 assert_eq!(results.hits[0].address.doc_id, 0);
726
727 let results = index.query("uris:two", 10).await.unwrap();
728 assert_eq!(results.hits.len(), 1, "Should find doc with 'two'");
729 assert_eq!(results.hits[0].address.doc_id, 0);
730
731 let results = index.query("uris:three", 10).await.unwrap();
732 assert_eq!(results.hits.len(), 1, "Should find doc with 'three'");
733 assert_eq!(results.hits[0].address.doc_id, 1);
734
735 let results = index.query("uris:nonexistent", 10).await.unwrap();
737 assert_eq!(results.hits.len(), 0, "Should not find non-existent value");
738 }
739
740 #[tokio::test]
747 async fn test_wand_optimization_for_or_queries() {
748 use crate::query::{BooleanQuery, TermQuery};
749
750 let mut schema_builder = SchemaBuilder::default();
751 let content = schema_builder.add_text_field("content", true, true);
752 let schema = schema_builder.build();
753
754 let dir = RamDirectory::new();
755 let config = IndexConfig::default();
756
757 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
759 .await
760 .unwrap();
761
762 let mut doc = Document::new();
764 doc.add_text(content, "rust programming language is fast");
765 writer.add_document(doc).unwrap();
766
767 let mut doc = Document::new();
769 doc.add_text(content, "rust is a systems language");
770 writer.add_document(doc).unwrap();
771
772 let mut doc = Document::new();
774 doc.add_text(content, "programming is fun");
775 writer.add_document(doc).unwrap();
776
777 let mut doc = Document::new();
779 doc.add_text(content, "python is easy to learn");
780 writer.add_document(doc).unwrap();
781
782 let mut doc = Document::new();
784 doc.add_text(content, "rust rust programming programming systems");
785 writer.add_document(doc).unwrap();
786
787 writer.commit().await.unwrap();
788
789 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
791
792 let or_query = BooleanQuery::new()
794 .should(TermQuery::text(content, "rust"))
795 .should(TermQuery::text(content, "programming"));
796
797 let results = index.search(&or_query, 10).await.unwrap();
798
799 assert_eq!(results.hits.len(), 4, "Should find exactly 4 documents");
801
802 let doc_ids: Vec<u32> = results.hits.iter().map(|h| h.address.doc_id).collect();
803 assert!(doc_ids.contains(&0), "Should find doc 0");
804 assert!(doc_ids.contains(&1), "Should find doc 1");
805 assert!(doc_ids.contains(&2), "Should find doc 2");
806 assert!(doc_ids.contains(&4), "Should find doc 4");
807 assert!(
808 !doc_ids.contains(&3),
809 "Should NOT find doc 3 (only has 'python')"
810 );
811
812 let single_query = BooleanQuery::new().should(TermQuery::text(content, "rust"));
814
815 let results = index.search(&single_query, 10).await.unwrap();
816 assert_eq!(results.hits.len(), 3, "Should find 3 documents with 'rust'");
817
818 let must_query = BooleanQuery::new()
820 .must(TermQuery::text(content, "rust"))
821 .should(TermQuery::text(content, "programming"));
822
823 let results = index.search(&must_query, 10).await.unwrap();
824 assert_eq!(results.hits.len(), 3, "Should find 3 documents with 'rust'");
826
827 let must_not_query = BooleanQuery::new()
829 .should(TermQuery::text(content, "rust"))
830 .should(TermQuery::text(content, "programming"))
831 .must_not(TermQuery::text(content, "systems"));
832
833 let results = index.search(&must_not_query, 10).await.unwrap();
834 let doc_ids: Vec<u32> = results.hits.iter().map(|h| h.address.doc_id).collect();
836 assert!(
837 !doc_ids.contains(&1),
838 "Should NOT find doc 1 (has 'systems')"
839 );
840 assert!(
841 !doc_ids.contains(&4),
842 "Should NOT find doc 4 (has 'systems')"
843 );
844
845 let or_query = BooleanQuery::new()
847 .should(TermQuery::text(content, "rust"))
848 .should(TermQuery::text(content, "programming"));
849
850 let results = index.search(&or_query, 2).await.unwrap();
851 assert_eq!(results.hits.len(), 2, "Should return only top 2 results");
852
853 }
856
857 #[tokio::test]
859 async fn test_wand_results_match_standard_boolean() {
860 use crate::query::{BooleanQuery, TermQuery, WandOrQuery};
861
862 let mut schema_builder = SchemaBuilder::default();
863 let content = schema_builder.add_text_field("content", true, true);
864 let schema = schema_builder.build();
865
866 let dir = RamDirectory::new();
867 let config = IndexConfig::default();
868
869 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
870 .await
871 .unwrap();
872
873 for i in 0..10 {
875 let mut doc = Document::new();
876 let text = match i % 4 {
877 0 => "apple banana cherry",
878 1 => "apple orange",
879 2 => "banana grape",
880 _ => "cherry date",
881 };
882 doc.add_text(content, text);
883 writer.add_document(doc).unwrap();
884 }
885
886 writer.commit().await.unwrap();
887 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
888
889 let wand_query = WandOrQuery::new(content).term("apple").term("banana");
891
892 let bool_query = BooleanQuery::new()
893 .should(TermQuery::text(content, "apple"))
894 .should(TermQuery::text(content, "banana"));
895
896 let wand_results = index.search(&wand_query, 10).await.unwrap();
897 let bool_results = index.search(&bool_query, 10).await.unwrap();
898
899 assert_eq!(
901 wand_results.hits.len(),
902 bool_results.hits.len(),
903 "WAND and Boolean should find same number of docs"
904 );
905
906 let wand_docs: std::collections::HashSet<u32> =
907 wand_results.hits.iter().map(|h| h.address.doc_id).collect();
908 let bool_docs: std::collections::HashSet<u32> =
909 bool_results.hits.iter().map(|h| h.address.doc_id).collect();
910
911 assert_eq!(
912 wand_docs, bool_docs,
913 "WAND and Boolean should find same documents"
914 );
915 }
916
917 #[tokio::test]
918 async fn test_vector_index_threshold_switch() {
919 use crate::dsl::{DenseVectorConfig, DenseVectorQuantization, VectorIndexType};
920
921 let mut schema_builder = SchemaBuilder::default();
923 let title = schema_builder.add_text_field("title", true, true);
924 let embedding = schema_builder.add_dense_vector_field_with_config(
925 "embedding",
926 true, true, DenseVectorConfig {
929 dim: 8,
930 index_type: VectorIndexType::IvfRaBitQ,
931 quantization: DenseVectorQuantization::F32,
932 num_clusters: Some(4), nprobe: 2,
934 build_threshold: Some(50), },
936 );
937 let schema = schema_builder.build();
938
939 let dir = RamDirectory::new();
940 let config = IndexConfig::default();
941
942 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
944 .await
945 .unwrap();
946
947 for i in 0..30 {
949 let mut doc = Document::new();
950 doc.add_text(title, format!("Document {}", i));
951 let vec: Vec<f32> = (0..8).map(|_| (i as f32) / 30.0).collect();
953 doc.add_dense_vector(embedding, vec);
954 writer.add_document(doc).unwrap();
955 }
956 writer.commit().await.unwrap();
957
958 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
960 assert!(
961 index.trained_centroids.is_empty(),
962 "Should not have trained centroids below threshold"
963 );
964
965 let query_vec: Vec<f32> = vec![0.5; 8];
967 let segments = index.segment_readers().await.unwrap();
968 assert!(!segments.is_empty());
969
970 let results = segments[0]
971 .search_dense_vector(
972 embedding,
973 &query_vec,
974 5,
975 0,
976 1,
977 crate::query::MultiValueCombiner::Max,
978 )
979 .await
980 .unwrap();
981 assert!(!results.is_empty(), "Flat search should return results");
982
983 let writer = IndexWriter::open(dir.clone(), config.clone())
985 .await
986 .unwrap();
987
988 for i in 30..60 {
990 let mut doc = Document::new();
991 doc.add_text(title, format!("Document {}", i));
992 let vec: Vec<f32> = (0..8).map(|_| (i as f32) / 60.0).collect();
993 doc.add_dense_vector(embedding, vec);
994 writer.add_document(doc).unwrap();
995 }
996 writer.commit().await.unwrap();
998
999 assert!(
1001 writer.is_vector_index_built(embedding).await,
1002 "Vector index should be built after crossing threshold"
1003 );
1004
1005 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1007 assert!(
1008 index.trained_centroids.contains_key(&embedding.0),
1009 "Should have loaded trained centroids for embedding field"
1010 );
1011
1012 let segments = index.segment_readers().await.unwrap();
1014 let results = segments[0]
1015 .search_dense_vector(
1016 embedding,
1017 &query_vec,
1018 5,
1019 0,
1020 1,
1021 crate::query::MultiValueCombiner::Max,
1022 )
1023 .await
1024 .unwrap();
1025 assert!(
1026 !results.is_empty(),
1027 "Search should return results after build"
1028 );
1029
1030 let writer = IndexWriter::open(dir.clone(), config.clone())
1032 .await
1033 .unwrap();
1034 writer.build_vector_index().await.unwrap(); assert!(writer.is_vector_index_built(embedding).await);
1038 }
1039
1040 #[tokio::test]
1043 async fn test_multi_round_merge_with_search() {
1044 let mut schema_builder = SchemaBuilder::default();
1045 let title = schema_builder.add_text_field("title", true, true);
1046 let body = schema_builder.add_text_field("body", true, true);
1047 let schema = schema_builder.build();
1048
1049 let dir = RamDirectory::new();
1050 let config = IndexConfig {
1051 max_indexing_memory_bytes: 512,
1052 ..Default::default()
1053 };
1054
1055 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1057 .await
1058 .unwrap();
1059
1060 for batch in 0..5 {
1061 for i in 0..10 {
1062 let mut doc = Document::new();
1063 doc.add_text(
1064 title,
1065 format!("alpha bravo charlie batch{} doc{}", batch, i),
1066 );
1067 doc.add_text(
1068 body,
1069 format!("the quick brown fox jumps over the lazy dog number {}", i),
1070 );
1071 writer.add_document(doc).unwrap();
1072 }
1073 writer.flush().await.unwrap();
1074 }
1075 writer.commit().await.unwrap();
1076
1077 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1078 let pre_merge_segments = index.segment_readers().await.unwrap().len();
1079 assert!(
1080 pre_merge_segments >= 3,
1081 "Expected >=3 segments, got {}",
1082 pre_merge_segments
1083 );
1084 assert_eq!(index.num_docs().await.unwrap(), 50);
1085
1086 let results = index.query("alpha", 100).await.unwrap();
1088 assert_eq!(results.hits.len(), 50, "all 50 docs should match 'alpha'");
1089
1090 let results = index.query("fox", 100).await.unwrap();
1091 assert_eq!(results.hits.len(), 50, "all 50 docs should match 'fox'");
1092
1093 let writer = IndexWriter::open(dir.clone(), config.clone())
1095 .await
1096 .unwrap();
1097 writer.force_merge().await.unwrap();
1098
1099 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1100 assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1101 assert_eq!(index.num_docs().await.unwrap(), 50);
1102
1103 let results = index.query("alpha", 100).await.unwrap();
1105 assert_eq!(
1106 results.hits.len(),
1107 50,
1108 "all 50 docs should match 'alpha' after merge 1"
1109 );
1110
1111 let results = index.query("fox", 100).await.unwrap();
1112 assert_eq!(
1113 results.hits.len(),
1114 50,
1115 "all 50 docs should match 'fox' after merge 1"
1116 );
1117
1118 let reader1 = index.reader().await.unwrap();
1120 let searcher1 = reader1.searcher().await.unwrap();
1121 for i in 0..50 {
1122 let doc = searcher1.doc(i).await.unwrap();
1123 assert!(doc.is_some(), "doc {} should exist after merge 1", i);
1124 }
1125
1126 let writer = IndexWriter::open(dir.clone(), config.clone())
1128 .await
1129 .unwrap();
1130 for batch in 0..3 {
1131 for i in 0..10 {
1132 let mut doc = Document::new();
1133 doc.add_text(
1134 title,
1135 format!("delta echo foxtrot round2_batch{} doc{}", batch, i),
1136 );
1137 doc.add_text(
1138 body,
1139 format!("the quick brown fox jumps again number {}", i),
1140 );
1141 writer.add_document(doc).unwrap();
1142 }
1143 writer.flush().await.unwrap();
1144 }
1145 writer.commit().await.unwrap();
1146
1147 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1148 assert_eq!(index.num_docs().await.unwrap(), 80);
1149 assert!(
1150 index.segment_readers().await.unwrap().len() >= 2,
1151 "Should have >=2 segments after round 2 ingestion"
1152 );
1153
1154 let results = index.query("fox", 100).await.unwrap();
1156 assert_eq!(results.hits.len(), 80, "all 80 docs should match 'fox'");
1157
1158 let results = index.query("alpha", 100).await.unwrap();
1159 assert_eq!(results.hits.len(), 50, "only round 1 docs match 'alpha'");
1160
1161 let results = index.query("delta", 100).await.unwrap();
1162 assert_eq!(results.hits.len(), 30, "only round 2 docs match 'delta'");
1163
1164 let writer = IndexWriter::open(dir.clone(), config.clone())
1166 .await
1167 .unwrap();
1168 writer.force_merge().await.unwrap();
1169
1170 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1171 assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1172 assert_eq!(index.num_docs().await.unwrap(), 80);
1173
1174 let results = index.query("fox", 100).await.unwrap();
1176 assert_eq!(results.hits.len(), 80, "all 80 docs after merge 2");
1177
1178 let results = index.query("alpha", 100).await.unwrap();
1179 assert_eq!(results.hits.len(), 50, "round 1 docs after merge 2");
1180
1181 let results = index.query("delta", 100).await.unwrap();
1182 assert_eq!(results.hits.len(), 30, "round 2 docs after merge 2");
1183
1184 let reader2 = index.reader().await.unwrap();
1186 let searcher2 = reader2.searcher().await.unwrap();
1187 for i in 0..80 {
1188 let doc = searcher2.doc(i).await.unwrap();
1189 assert!(doc.is_some(), "doc {} should exist after merge 2", i);
1190 }
1191 }
1192
1193 #[tokio::test]
1196 async fn test_large_scale_merge_correctness() {
1197 let mut schema_builder = SchemaBuilder::default();
1198 let title = schema_builder.add_text_field("title", true, true);
1199 let schema = schema_builder.build();
1200
1201 let dir = RamDirectory::new();
1202 let config = IndexConfig {
1203 max_indexing_memory_bytes: 512,
1204 ..Default::default()
1205 };
1206
1207 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1208 .await
1209 .unwrap();
1210
1211 let total_docs = 200u32;
1214 for batch in 0..8 {
1215 for i in 0..25 {
1216 let mut doc = Document::new();
1217 doc.add_text(
1218 title,
1219 format!("common shared term unique_{} item{}", batch, i),
1220 );
1221 writer.add_document(doc).unwrap();
1222 }
1223 writer.flush().await.unwrap();
1224 }
1225 writer.commit().await.unwrap();
1226
1227 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1229 assert_eq!(index.num_docs().await.unwrap(), total_docs);
1230
1231 let results = index.query("common", 300).await.unwrap();
1232 assert_eq!(
1233 results.hits.len(),
1234 total_docs as usize,
1235 "all docs should match 'common'"
1236 );
1237
1238 for batch in 0..8 {
1240 let q = format!("unique_{}", batch);
1241 let results = index.query(&q, 100).await.unwrap();
1242 assert_eq!(results.hits.len(), 25, "'{}' should match 25 docs", q);
1243 }
1244
1245 let writer = IndexWriter::open(dir.clone(), config.clone())
1247 .await
1248 .unwrap();
1249 writer.force_merge().await.unwrap();
1250
1251 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1253 assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1254 assert_eq!(index.num_docs().await.unwrap(), total_docs);
1255
1256 let results = index.query("common", 300).await.unwrap();
1257 assert_eq!(results.hits.len(), total_docs as usize);
1258
1259 for batch in 0..8 {
1260 let q = format!("unique_{}", batch);
1261 let results = index.query(&q, 100).await.unwrap();
1262 assert_eq!(results.hits.len(), 25, "'{}' after merge", q);
1263 }
1264
1265 let reader = index.reader().await.unwrap();
1267 let searcher = reader.searcher().await.unwrap();
1268 for i in 0..total_docs {
1269 let doc = searcher.doc(i).await.unwrap();
1270 assert!(doc.is_some(), "doc {} missing after merge", i);
1271 }
1272 }
1273}