1#[cfg(feature = "native")]
11use crate::dsl::Schema;
12#[cfg(feature = "native")]
13use crate::error::Result;
14#[cfg(feature = "native")]
15use std::sync::Arc;
16
17mod searcher;
18pub use searcher::Searcher;
19
20#[cfg(feature = "native")]
21mod reader;
22#[cfg(feature = "native")]
23mod vector_builder;
24#[cfg(feature = "native")]
25mod writer;
26#[cfg(feature = "native")]
27pub use reader::IndexReader;
28#[cfg(feature = "native")]
29pub use writer::IndexWriter;
30
31mod metadata;
32pub use metadata::{FieldVectorMeta, INDEX_META_FILENAME, IndexMetadata, VectorIndexState};
33
34#[cfg(feature = "native")]
35mod helpers;
36#[cfg(feature = "native")]
37pub use helpers::{
38 IndexingStats, SchemaConfig, SchemaFieldConfig, create_index_at_path, create_index_from_sdl,
39 index_documents_from_reader, index_json_document, parse_schema,
40};
41
42pub const SLICE_CACHE_FILENAME: &str = "index.slicecache";
44
45#[derive(Debug, Clone)]
47pub struct IndexConfig {
48 pub num_threads: usize,
50 pub num_indexing_threads: usize,
52 pub num_compression_threads: usize,
54 pub term_cache_blocks: usize,
56 pub store_cache_blocks: usize,
58 pub max_indexing_memory_bytes: usize,
60 pub merge_policy: Box<dyn crate::merge::MergePolicy>,
62 pub optimization: crate::structures::IndexOptimization,
64 pub reload_interval_ms: u64,
66}
67
68impl Default for IndexConfig {
69 fn default() -> Self {
70 #[cfg(feature = "native")]
71 let cpus = num_cpus::get().max(1);
72 #[cfg(not(feature = "native"))]
73 let cpus = 1;
74
75 Self {
76 num_threads: cpus,
77 num_indexing_threads: 1,
78 num_compression_threads: cpus,
79 term_cache_blocks: 256,
80 store_cache_blocks: 32,
81 max_indexing_memory_bytes: 256 * 1024 * 1024, merge_policy: Box::new(crate::merge::TieredMergePolicy::default()),
83 optimization: crate::structures::IndexOptimization::default(),
84 reload_interval_ms: 1000, }
86 }
87}
88
89#[cfg(feature = "native")]
98pub struct Index<D: crate::directories::DirectoryWriter + 'static> {
99 directory: Arc<D>,
100 schema: Arc<Schema>,
101 config: IndexConfig,
102 segment_manager: Arc<crate::merge::SegmentManager<D>>,
104 cached_reader: tokio::sync::OnceCell<IndexReader<D>>,
106}
107
108#[cfg(feature = "native")]
109impl<D: crate::directories::DirectoryWriter + 'static> Index<D> {
110 pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
112 let directory = Arc::new(directory);
113 let schema = Arc::new(schema);
114 let metadata = IndexMetadata::new((*schema).clone());
115
116 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
117 Arc::clone(&directory),
118 Arc::clone(&schema),
119 metadata,
120 config.merge_policy.clone_box(),
121 config.term_cache_blocks,
122 ));
123
124 segment_manager.update_metadata(|_| {}).await?;
126
127 Ok(Self {
128 directory,
129 schema,
130 config,
131 segment_manager,
132 cached_reader: tokio::sync::OnceCell::new(),
133 })
134 }
135
136 pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
138 let directory = Arc::new(directory);
139
140 let metadata = IndexMetadata::load(directory.as_ref()).await?;
142 let schema = Arc::new(metadata.schema.clone());
143
144 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
145 Arc::clone(&directory),
146 Arc::clone(&schema),
147 metadata,
148 config.merge_policy.clone_box(),
149 config.term_cache_blocks,
150 ));
151
152 segment_manager.load_and_publish_trained().await;
154
155 Ok(Self {
156 directory,
157 schema,
158 config,
159 segment_manager,
160 cached_reader: tokio::sync::OnceCell::new(),
161 })
162 }
163
164 pub fn schema(&self) -> &Schema {
166 &self.schema
167 }
168
169 pub fn directory(&self) -> &D {
171 &self.directory
172 }
173
174 pub fn segment_manager(&self) -> &Arc<crate::merge::SegmentManager<D>> {
176 &self.segment_manager
177 }
178
179 pub async fn reader(&self) -> Result<&IndexReader<D>> {
184 self.cached_reader
185 .get_or_try_init(|| async {
186 IndexReader::from_segment_manager(
187 Arc::clone(&self.schema),
188 Arc::clone(&self.segment_manager),
189 self.config.term_cache_blocks,
190 self.config.reload_interval_ms,
191 )
192 .await
193 })
194 .await
195 }
196
197 pub fn config(&self) -> &IndexConfig {
199 &self.config
200 }
201
202 pub async fn segment_readers(&self) -> Result<Vec<Arc<crate::segment::SegmentReader>>> {
204 let reader = self.reader().await?;
205 let searcher = reader.searcher().await?;
206 Ok(searcher.segment_readers().to_vec())
207 }
208
209 pub async fn num_docs(&self) -> Result<u32> {
211 let reader = self.reader().await?;
212 let searcher = reader.searcher().await?;
213 Ok(searcher.num_docs())
214 }
215
216 pub fn default_fields(&self) -> Vec<crate::Field> {
218 if !self.schema.default_fields().is_empty() {
219 self.schema.default_fields().to_vec()
220 } else {
221 self.schema
222 .fields()
223 .filter(|(_, entry)| {
224 entry.indexed && entry.field_type == crate::dsl::FieldType::Text
225 })
226 .map(|(field, _)| field)
227 .collect()
228 }
229 }
230
231 pub fn tokenizers(&self) -> Arc<crate::tokenizer::TokenizerRegistry> {
233 Arc::new(crate::tokenizer::TokenizerRegistry::default())
234 }
235
236 pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
238 let default_fields = self.default_fields();
239 let tokenizers = self.tokenizers();
240
241 let query_routers = self.schema.query_routers();
242 if !query_routers.is_empty()
243 && let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers)
244 {
245 return crate::dsl::QueryLanguageParser::with_router(
246 Arc::clone(&self.schema),
247 default_fields,
248 tokenizers,
249 router,
250 );
251 }
252
253 crate::dsl::QueryLanguageParser::new(Arc::clone(&self.schema), default_fields, tokenizers)
254 }
255
256 pub async fn query(
258 &self,
259 query_str: &str,
260 limit: usize,
261 ) -> Result<crate::query::SearchResponse> {
262 self.query_offset(query_str, limit, 0).await
263 }
264
265 pub async fn query_offset(
267 &self,
268 query_str: &str,
269 limit: usize,
270 offset: usize,
271 ) -> Result<crate::query::SearchResponse> {
272 let parser = self.query_parser();
273 let query = parser
274 .parse(query_str)
275 .map_err(crate::error::Error::Query)?;
276 self.search_offset(query.as_ref(), limit, offset).await
277 }
278
279 pub async fn search(
281 &self,
282 query: &dyn crate::query::Query,
283 limit: usize,
284 ) -> Result<crate::query::SearchResponse> {
285 self.search_offset(query, limit, 0).await
286 }
287
288 pub async fn search_offset(
290 &self,
291 query: &dyn crate::query::Query,
292 limit: usize,
293 offset: usize,
294 ) -> Result<crate::query::SearchResponse> {
295 let reader = self.reader().await?;
296 let searcher = reader.searcher().await?;
297 let segments = searcher.segment_readers();
298
299 let fetch_limit = offset + limit;
300
301 let futures: Vec<_> = segments
302 .iter()
303 .map(|segment| {
304 let sid = segment.meta().id;
305 async move {
306 let results =
307 crate::query::search_segment(segment.as_ref(), query, fetch_limit).await?;
308 Ok::<_, crate::error::Error>(
309 results
310 .into_iter()
311 .map(move |r| (sid, r))
312 .collect::<Vec<_>>(),
313 )
314 }
315 })
316 .collect();
317
318 let batches = futures::future::try_join_all(futures).await?;
319 let mut all_results: Vec<(u128, crate::query::SearchResult)> =
320 Vec::with_capacity(batches.iter().map(|b| b.len()).sum());
321 for batch in batches {
322 all_results.extend(batch);
323 }
324
325 all_results.sort_by(|a, b| {
326 b.1.score
327 .partial_cmp(&a.1.score)
328 .unwrap_or(std::cmp::Ordering::Equal)
329 });
330
331 let total_hits = all_results.len() as u32;
332
333 let hits: Vec<crate::query::SearchHit> = all_results
334 .into_iter()
335 .skip(offset)
336 .take(limit)
337 .map(|(segment_id, result)| crate::query::SearchHit {
338 address: crate::query::DocAddress::new(segment_id, result.doc_id),
339 score: result.score,
340 matched_fields: result.extract_ordinals(),
341 })
342 .collect();
343
344 Ok(crate::query::SearchResponse { hits, total_hits })
345 }
346
347 pub async fn get_document(
349 &self,
350 address: &crate::query::DocAddress,
351 ) -> Result<Option<crate::dsl::Document>> {
352 let reader = self.reader().await?;
353 let searcher = reader.searcher().await?;
354 searcher.get_document(address).await
355 }
356
357 pub async fn reload(&self) -> Result<()> {
359 Ok(())
361 }
362
363 pub async fn get_postings(
365 &self,
366 field: crate::Field,
367 term: &[u8],
368 ) -> Result<
369 Vec<(
370 Arc<crate::segment::SegmentReader>,
371 crate::structures::BlockPostingList,
372 )>,
373 > {
374 let segments = self.segment_readers().await?;
375 let mut results = Vec::new();
376
377 for segment in segments {
378 if let Some(postings) = segment.get_postings(field, term).await? {
379 results.push((segment, postings));
380 }
381 }
382
383 Ok(results)
384 }
385}
386
387#[cfg(feature = "native")]
389impl<D: crate::directories::DirectoryWriter + 'static> Index<D> {
390 pub fn writer(&self) -> writer::IndexWriter<D> {
392 writer::IndexWriter::from_index(self)
393 }
394}
395
396#[cfg(test)]
399mod tests {
400 use super::*;
401 use crate::directories::RamDirectory;
402 use crate::dsl::{Document, SchemaBuilder};
403
404 #[tokio::test]
405 async fn test_index_create_and_search() {
406 let mut schema_builder = SchemaBuilder::default();
407 let title = schema_builder.add_text_field("title", true, true);
408 let body = schema_builder.add_text_field("body", true, true);
409 let schema = schema_builder.build();
410
411 let dir = RamDirectory::new();
412 let config = IndexConfig::default();
413
414 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
416 .await
417 .unwrap();
418
419 let mut doc1 = Document::new();
420 doc1.add_text(title, "Hello World");
421 doc1.add_text(body, "This is the first document");
422 writer.add_document(doc1).unwrap();
423
424 let mut doc2 = Document::new();
425 doc2.add_text(title, "Goodbye World");
426 doc2.add_text(body, "This is the second document");
427 writer.add_document(doc2).unwrap();
428
429 writer.commit().await.unwrap();
430
431 let index = Index::open(dir, config).await.unwrap();
433 assert_eq!(index.num_docs().await.unwrap(), 2);
434
435 let postings = index.get_postings(title, b"world").await.unwrap();
437 assert_eq!(postings.len(), 1); assert_eq!(postings[0].1.doc_count(), 2); let reader = index.reader().await.unwrap();
442 let searcher = reader.searcher().await.unwrap();
443 let doc = searcher.doc(0).await.unwrap().unwrap();
444 assert_eq!(doc.get_first(title).unwrap().as_text(), Some("Hello World"));
445 }
446
447 #[tokio::test]
448 async fn test_multiple_segments() {
449 let mut schema_builder = SchemaBuilder::default();
450 let title = schema_builder.add_text_field("title", true, true);
451 let schema = schema_builder.build();
452
453 let dir = RamDirectory::new();
454 let config = IndexConfig {
455 max_indexing_memory_bytes: 1024, ..Default::default()
457 };
458
459 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
460 .await
461 .unwrap();
462
463 for batch in 0..3 {
465 for i in 0..5 {
466 let mut doc = Document::new();
467 doc.add_text(title, format!("Document {} batch {}", i, batch));
468 writer.add_document(doc).unwrap();
469 }
470 writer.commit().await.unwrap();
471 }
472
473 let index = Index::open(dir, config).await.unwrap();
475 assert_eq!(index.num_docs().await.unwrap(), 15);
476 assert!(
478 index.segment_readers().await.unwrap().len() >= 2,
479 "Expected multiple segments"
480 );
481 }
482
483 #[tokio::test]
484 async fn test_segment_merge() {
485 let mut schema_builder = SchemaBuilder::default();
486 let title = schema_builder.add_text_field("title", true, true);
487 let schema = schema_builder.build();
488
489 let dir = RamDirectory::new();
490 let config = IndexConfig {
491 max_indexing_memory_bytes: 512, ..Default::default()
493 };
494
495 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
496 .await
497 .unwrap();
498
499 for batch in 0..3 {
501 for i in 0..3 {
502 let mut doc = Document::new();
503 doc.add_text(title, format!("Document {} batch {}", i, batch));
504 writer.add_document(doc).unwrap();
505 }
506 writer.flush().await.unwrap();
507 }
508 writer.commit().await.unwrap();
509
510 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
512 assert!(
513 index.segment_readers().await.unwrap().len() >= 2,
514 "Expected multiple segments"
515 );
516
517 let writer = IndexWriter::open(dir.clone(), config.clone())
519 .await
520 .unwrap();
521 writer.force_merge().await.unwrap();
522
523 let index = Index::open(dir, config).await.unwrap();
525 assert_eq!(index.segment_readers().await.unwrap().len(), 1);
526 assert_eq!(index.num_docs().await.unwrap(), 9);
527
528 let reader = index.reader().await.unwrap();
530 let searcher = reader.searcher().await.unwrap();
531 let mut found_docs = 0;
532 for i in 0..9 {
533 if searcher.doc(i).await.unwrap().is_some() {
534 found_docs += 1;
535 }
536 }
537 assert_eq!(found_docs, 9);
538 }
539
540 #[tokio::test]
541 async fn test_match_query() {
542 let mut schema_builder = SchemaBuilder::default();
543 let title = schema_builder.add_text_field("title", true, true);
544 let body = schema_builder.add_text_field("body", true, true);
545 let schema = schema_builder.build();
546
547 let dir = RamDirectory::new();
548 let config = IndexConfig::default();
549
550 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
551 .await
552 .unwrap();
553
554 let mut doc1 = Document::new();
555 doc1.add_text(title, "rust programming");
556 doc1.add_text(body, "Learn rust language");
557 writer.add_document(doc1).unwrap();
558
559 let mut doc2 = Document::new();
560 doc2.add_text(title, "python programming");
561 doc2.add_text(body, "Learn python language");
562 writer.add_document(doc2).unwrap();
563
564 writer.commit().await.unwrap();
565
566 let index = Index::open(dir, config).await.unwrap();
567
568 let results = index.query("rust", 10).await.unwrap();
570 assert_eq!(results.hits.len(), 1);
571
572 let results = index.query("rust programming", 10).await.unwrap();
574 assert!(!results.hits.is_empty());
575
576 let hit = &results.hits[0];
578 assert!(!hit.address.segment_id.is_empty(), "Should have segment_id");
579
580 let doc = index.get_document(&hit.address).await.unwrap().unwrap();
582 assert!(
583 !doc.field_values().is_empty(),
584 "Doc should have field values"
585 );
586
587 let reader = index.reader().await.unwrap();
589 let searcher = reader.searcher().await.unwrap();
590 let doc = searcher.doc(0).await.unwrap().unwrap();
591 assert!(
592 !doc.field_values().is_empty(),
593 "Doc should have field values"
594 );
595 }
596
597 #[tokio::test]
598 async fn test_slice_cache_warmup_and_load() {
599 use crate::directories::SliceCachingDirectory;
600
601 let mut schema_builder = SchemaBuilder::default();
602 let title = schema_builder.add_text_field("title", true, true);
603 let body = schema_builder.add_text_field("body", true, true);
604 let schema = schema_builder.build();
605
606 let dir = RamDirectory::new();
607 let config = IndexConfig::default();
608
609 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
611 .await
612 .unwrap();
613
614 for i in 0..10 {
615 let mut doc = Document::new();
616 doc.add_text(title, format!("Document {} about rust", i));
617 doc.add_text(body, format!("This is body text number {}", i));
618 writer.add_document(doc).unwrap();
619 }
620 writer.commit().await.unwrap();
621
622 let caching_dir = SliceCachingDirectory::new(dir.clone(), 1024 * 1024);
624 let index = Index::open(caching_dir, config.clone()).await.unwrap();
625
626 let results = index.query("rust", 10).await.unwrap();
628 assert!(!results.hits.is_empty());
629
630 let stats = index.directory.stats();
632 assert!(stats.total_bytes > 0, "Cache should have data after search");
633 }
634
635 #[tokio::test]
636 async fn test_multivalue_field_indexing_and_search() {
637 let mut schema_builder = SchemaBuilder::default();
638 let uris = schema_builder.add_text_field("uris", true, true);
639 let title = schema_builder.add_text_field("title", true, true);
640 let schema = schema_builder.build();
641
642 let dir = RamDirectory::new();
643 let config = IndexConfig::default();
644
645 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
647 .await
648 .unwrap();
649
650 let mut doc = Document::new();
651 doc.add_text(uris, "one");
652 doc.add_text(uris, "two");
653 doc.add_text(title, "Test Document");
654 writer.add_document(doc).unwrap();
655
656 let mut doc2 = Document::new();
658 doc2.add_text(uris, "three");
659 doc2.add_text(title, "Another Document");
660 writer.add_document(doc2).unwrap();
661
662 writer.commit().await.unwrap();
663
664 let index = Index::open(dir, config).await.unwrap();
666 assert_eq!(index.num_docs().await.unwrap(), 2);
667
668 let reader = index.reader().await.unwrap();
670 let searcher = reader.searcher().await.unwrap();
671 let doc = searcher.doc(0).await.unwrap().unwrap();
672 let all_uris: Vec<_> = doc.get_all(uris).collect();
673 assert_eq!(all_uris.len(), 2, "Should have 2 uris values");
674 assert_eq!(all_uris[0].as_text(), Some("one"));
675 assert_eq!(all_uris[1].as_text(), Some("two"));
676
677 let json = doc.to_json(index.schema());
679 let uris_json = json.get("uris").unwrap();
680 assert!(uris_json.is_array(), "Multi-value field should be an array");
681 let uris_arr = uris_json.as_array().unwrap();
682 assert_eq!(uris_arr.len(), 2);
683 assert_eq!(uris_arr[0].as_str(), Some("one"));
684 assert_eq!(uris_arr[1].as_str(), Some("two"));
685
686 let results = index.query("uris:one", 10).await.unwrap();
688 assert_eq!(results.hits.len(), 1, "Should find doc with 'one'");
689 assert_eq!(results.hits[0].address.doc_id, 0);
690
691 let results = index.query("uris:two", 10).await.unwrap();
692 assert_eq!(results.hits.len(), 1, "Should find doc with 'two'");
693 assert_eq!(results.hits[0].address.doc_id, 0);
694
695 let results = index.query("uris:three", 10).await.unwrap();
696 assert_eq!(results.hits.len(), 1, "Should find doc with 'three'");
697 assert_eq!(results.hits[0].address.doc_id, 1);
698
699 let results = index.query("uris:nonexistent", 10).await.unwrap();
701 assert_eq!(results.hits.len(), 0, "Should not find non-existent value");
702 }
703
704 #[tokio::test]
711 async fn test_wand_optimization_for_or_queries() {
712 use crate::query::{BooleanQuery, TermQuery};
713
714 let mut schema_builder = SchemaBuilder::default();
715 let content = schema_builder.add_text_field("content", true, true);
716 let schema = schema_builder.build();
717
718 let dir = RamDirectory::new();
719 let config = IndexConfig::default();
720
721 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
723 .await
724 .unwrap();
725
726 let mut doc = Document::new();
728 doc.add_text(content, "rust programming language is fast");
729 writer.add_document(doc).unwrap();
730
731 let mut doc = Document::new();
733 doc.add_text(content, "rust is a systems language");
734 writer.add_document(doc).unwrap();
735
736 let mut doc = Document::new();
738 doc.add_text(content, "programming is fun");
739 writer.add_document(doc).unwrap();
740
741 let mut doc = Document::new();
743 doc.add_text(content, "python is easy to learn");
744 writer.add_document(doc).unwrap();
745
746 let mut doc = Document::new();
748 doc.add_text(content, "rust rust programming programming systems");
749 writer.add_document(doc).unwrap();
750
751 writer.commit().await.unwrap();
752
753 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
755
756 let or_query = BooleanQuery::new()
758 .should(TermQuery::text(content, "rust"))
759 .should(TermQuery::text(content, "programming"));
760
761 let results = index.search(&or_query, 10).await.unwrap();
762
763 assert_eq!(results.hits.len(), 4, "Should find exactly 4 documents");
765
766 let doc_ids: Vec<u32> = results.hits.iter().map(|h| h.address.doc_id).collect();
767 assert!(doc_ids.contains(&0), "Should find doc 0");
768 assert!(doc_ids.contains(&1), "Should find doc 1");
769 assert!(doc_ids.contains(&2), "Should find doc 2");
770 assert!(doc_ids.contains(&4), "Should find doc 4");
771 assert!(
772 !doc_ids.contains(&3),
773 "Should NOT find doc 3 (only has 'python')"
774 );
775
776 let single_query = BooleanQuery::new().should(TermQuery::text(content, "rust"));
778
779 let results = index.search(&single_query, 10).await.unwrap();
780 assert_eq!(results.hits.len(), 3, "Should find 3 documents with 'rust'");
781
782 let must_query = BooleanQuery::new()
784 .must(TermQuery::text(content, "rust"))
785 .should(TermQuery::text(content, "programming"));
786
787 let results = index.search(&must_query, 10).await.unwrap();
788 assert_eq!(results.hits.len(), 3, "Should find 3 documents with 'rust'");
790
791 let must_not_query = BooleanQuery::new()
793 .should(TermQuery::text(content, "rust"))
794 .should(TermQuery::text(content, "programming"))
795 .must_not(TermQuery::text(content, "systems"));
796
797 let results = index.search(&must_not_query, 10).await.unwrap();
798 let doc_ids: Vec<u32> = results.hits.iter().map(|h| h.address.doc_id).collect();
800 assert!(
801 !doc_ids.contains(&1),
802 "Should NOT find doc 1 (has 'systems')"
803 );
804 assert!(
805 !doc_ids.contains(&4),
806 "Should NOT find doc 4 (has 'systems')"
807 );
808
809 let or_query = BooleanQuery::new()
811 .should(TermQuery::text(content, "rust"))
812 .should(TermQuery::text(content, "programming"));
813
814 let results = index.search(&or_query, 2).await.unwrap();
815 assert_eq!(results.hits.len(), 2, "Should return only top 2 results");
816
817 }
820
821 #[tokio::test]
823 async fn test_wand_results_match_standard_boolean() {
824 use crate::query::{BooleanQuery, TermQuery, WandOrQuery};
825
826 let mut schema_builder = SchemaBuilder::default();
827 let content = schema_builder.add_text_field("content", true, true);
828 let schema = schema_builder.build();
829
830 let dir = RamDirectory::new();
831 let config = IndexConfig::default();
832
833 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
834 .await
835 .unwrap();
836
837 for i in 0..10 {
839 let mut doc = Document::new();
840 let text = match i % 4 {
841 0 => "apple banana cherry",
842 1 => "apple orange",
843 2 => "banana grape",
844 _ => "cherry date",
845 };
846 doc.add_text(content, text);
847 writer.add_document(doc).unwrap();
848 }
849
850 writer.commit().await.unwrap();
851 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
852
853 let wand_query = WandOrQuery::new(content).term("apple").term("banana");
855
856 let bool_query = BooleanQuery::new()
857 .should(TermQuery::text(content, "apple"))
858 .should(TermQuery::text(content, "banana"));
859
860 let wand_results = index.search(&wand_query, 10).await.unwrap();
861 let bool_results = index.search(&bool_query, 10).await.unwrap();
862
863 assert_eq!(
865 wand_results.hits.len(),
866 bool_results.hits.len(),
867 "WAND and Boolean should find same number of docs"
868 );
869
870 let wand_docs: std::collections::HashSet<u32> =
871 wand_results.hits.iter().map(|h| h.address.doc_id).collect();
872 let bool_docs: std::collections::HashSet<u32> =
873 bool_results.hits.iter().map(|h| h.address.doc_id).collect();
874
875 assert_eq!(
876 wand_docs, bool_docs,
877 "WAND and Boolean should find same documents"
878 );
879 }
880
881 #[tokio::test]
882 async fn test_vector_index_threshold_switch() {
883 use crate::dsl::{DenseVectorConfig, DenseVectorQuantization, VectorIndexType};
884
885 let mut schema_builder = SchemaBuilder::default();
887 let title = schema_builder.add_text_field("title", true, true);
888 let embedding = schema_builder.add_dense_vector_field_with_config(
889 "embedding",
890 true, true, DenseVectorConfig {
893 dim: 8,
894 index_type: VectorIndexType::IvfRaBitQ,
895 quantization: DenseVectorQuantization::F32,
896 num_clusters: Some(4), nprobe: 2,
898 build_threshold: Some(50), },
900 );
901 let schema = schema_builder.build();
902
903 let dir = RamDirectory::new();
904 let config = IndexConfig::default();
905
906 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
908 .await
909 .unwrap();
910
911 for i in 0..30 {
913 let mut doc = Document::new();
914 doc.add_text(title, format!("Document {}", i));
915 let vec: Vec<f32> = (0..8).map(|_| (i as f32) / 30.0).collect();
917 doc.add_dense_vector(embedding, vec);
918 writer.add_document(doc).unwrap();
919 }
920 writer.commit().await.unwrap();
921
922 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
924 assert!(
925 index.segment_manager.trained().is_none(),
926 "Should not have trained centroids below threshold"
927 );
928
929 let query_vec: Vec<f32> = vec![0.5; 8];
931 let segments = index.segment_readers().await.unwrap();
932 assert!(!segments.is_empty());
933
934 let results = segments[0]
935 .search_dense_vector(
936 embedding,
937 &query_vec,
938 5,
939 0,
940 1,
941 crate::query::MultiValueCombiner::Max,
942 )
943 .await
944 .unwrap();
945 assert!(!results.is_empty(), "Flat search should return results");
946
947 let writer = IndexWriter::open(dir.clone(), config.clone())
949 .await
950 .unwrap();
951
952 for i in 30..60 {
954 let mut doc = Document::new();
955 doc.add_text(title, format!("Document {}", i));
956 let vec: Vec<f32> = (0..8).map(|_| (i as f32) / 60.0).collect();
957 doc.add_dense_vector(embedding, vec);
958 writer.add_document(doc).unwrap();
959 }
960 writer.commit().await.unwrap();
961
962 writer.build_vector_index().await.unwrap();
964
965 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
967 assert!(
968 index.segment_manager.trained().is_some(),
969 "Should have loaded trained centroids for embedding field"
970 );
971
972 let segments = index.segment_readers().await.unwrap();
974 let results = segments[0]
975 .search_dense_vector(
976 embedding,
977 &query_vec,
978 5,
979 0,
980 1,
981 crate::query::MultiValueCombiner::Max,
982 )
983 .await
984 .unwrap();
985 assert!(
986 !results.is_empty(),
987 "Search should return results after build"
988 );
989
990 let writer = IndexWriter::open(dir.clone(), config.clone())
992 .await
993 .unwrap();
994 writer.build_vector_index().await.unwrap(); assert!(writer.segment_manager.trained().is_some());
998 }
999
1000 #[tokio::test]
1003 async fn test_multi_round_merge_with_search() {
1004 let mut schema_builder = SchemaBuilder::default();
1005 let title = schema_builder.add_text_field("title", true, true);
1006 let body = schema_builder.add_text_field("body", true, true);
1007 let schema = schema_builder.build();
1008
1009 let dir = RamDirectory::new();
1010 let config = IndexConfig {
1011 max_indexing_memory_bytes: 512,
1012 ..Default::default()
1013 };
1014
1015 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1017 .await
1018 .unwrap();
1019
1020 for batch in 0..5 {
1021 for i in 0..10 {
1022 let mut doc = Document::new();
1023 doc.add_text(
1024 title,
1025 format!("alpha bravo charlie batch{} doc{}", batch, i),
1026 );
1027 doc.add_text(
1028 body,
1029 format!("the quick brown fox jumps over the lazy dog number {}", i),
1030 );
1031 writer.add_document(doc).unwrap();
1032 }
1033 writer.flush().await.unwrap();
1034 }
1035 writer.commit().await.unwrap();
1036
1037 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1038 let pre_merge_segments = index.segment_readers().await.unwrap().len();
1039 assert!(
1040 pre_merge_segments >= 3,
1041 "Expected >=3 segments, got {}",
1042 pre_merge_segments
1043 );
1044 assert_eq!(index.num_docs().await.unwrap(), 50);
1045
1046 let results = index.query("alpha", 100).await.unwrap();
1048 assert_eq!(results.hits.len(), 50, "all 50 docs should match 'alpha'");
1049
1050 let results = index.query("fox", 100).await.unwrap();
1051 assert_eq!(results.hits.len(), 50, "all 50 docs should match 'fox'");
1052
1053 let writer = IndexWriter::open(dir.clone(), config.clone())
1055 .await
1056 .unwrap();
1057 writer.force_merge().await.unwrap();
1058
1059 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1060 assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1061 assert_eq!(index.num_docs().await.unwrap(), 50);
1062
1063 let results = index.query("alpha", 100).await.unwrap();
1065 assert_eq!(
1066 results.hits.len(),
1067 50,
1068 "all 50 docs should match 'alpha' after merge 1"
1069 );
1070
1071 let results = index.query("fox", 100).await.unwrap();
1072 assert_eq!(
1073 results.hits.len(),
1074 50,
1075 "all 50 docs should match 'fox' after merge 1"
1076 );
1077
1078 let reader1 = index.reader().await.unwrap();
1080 let searcher1 = reader1.searcher().await.unwrap();
1081 for i in 0..50 {
1082 let doc = searcher1.doc(i).await.unwrap();
1083 assert!(doc.is_some(), "doc {} should exist after merge 1", i);
1084 }
1085
1086 let writer = IndexWriter::open(dir.clone(), config.clone())
1088 .await
1089 .unwrap();
1090 for batch in 0..3 {
1091 for i in 0..10 {
1092 let mut doc = Document::new();
1093 doc.add_text(
1094 title,
1095 format!("delta echo foxtrot round2_batch{} doc{}", batch, i),
1096 );
1097 doc.add_text(
1098 body,
1099 format!("the quick brown fox jumps again number {}", i),
1100 );
1101 writer.add_document(doc).unwrap();
1102 }
1103 writer.flush().await.unwrap();
1104 }
1105 writer.commit().await.unwrap();
1106
1107 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1108 assert_eq!(index.num_docs().await.unwrap(), 80);
1109 assert!(
1110 index.segment_readers().await.unwrap().len() >= 2,
1111 "Should have >=2 segments after round 2 ingestion"
1112 );
1113
1114 let results = index.query("fox", 100).await.unwrap();
1116 assert_eq!(results.hits.len(), 80, "all 80 docs should match 'fox'");
1117
1118 let results = index.query("alpha", 100).await.unwrap();
1119 assert_eq!(results.hits.len(), 50, "only round 1 docs match 'alpha'");
1120
1121 let results = index.query("delta", 100).await.unwrap();
1122 assert_eq!(results.hits.len(), 30, "only round 2 docs match 'delta'");
1123
1124 let writer = IndexWriter::open(dir.clone(), config.clone())
1126 .await
1127 .unwrap();
1128 writer.force_merge().await.unwrap();
1129
1130 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1131 assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1132 assert_eq!(index.num_docs().await.unwrap(), 80);
1133
1134 let results = index.query("fox", 100).await.unwrap();
1136 assert_eq!(results.hits.len(), 80, "all 80 docs after merge 2");
1137
1138 let results = index.query("alpha", 100).await.unwrap();
1139 assert_eq!(results.hits.len(), 50, "round 1 docs after merge 2");
1140
1141 let results = index.query("delta", 100).await.unwrap();
1142 assert_eq!(results.hits.len(), 30, "round 2 docs after merge 2");
1143
1144 let reader2 = index.reader().await.unwrap();
1146 let searcher2 = reader2.searcher().await.unwrap();
1147 for i in 0..80 {
1148 let doc = searcher2.doc(i).await.unwrap();
1149 assert!(doc.is_some(), "doc {} should exist after merge 2", i);
1150 }
1151 }
1152
1153 #[tokio::test]
1156 async fn test_large_scale_merge_correctness() {
1157 let mut schema_builder = SchemaBuilder::default();
1158 let title = schema_builder.add_text_field("title", true, true);
1159 let schema = schema_builder.build();
1160
1161 let dir = RamDirectory::new();
1162 let config = IndexConfig {
1163 max_indexing_memory_bytes: 512,
1164 ..Default::default()
1165 };
1166
1167 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1168 .await
1169 .unwrap();
1170
1171 let total_docs = 200u32;
1174 for batch in 0..8 {
1175 for i in 0..25 {
1176 let mut doc = Document::new();
1177 doc.add_text(
1178 title,
1179 format!("common shared term unique_{} item{}", batch, i),
1180 );
1181 writer.add_document(doc).unwrap();
1182 }
1183 writer.flush().await.unwrap();
1184 }
1185 writer.commit().await.unwrap();
1186
1187 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1189 assert_eq!(index.num_docs().await.unwrap(), total_docs);
1190
1191 let results = index.query("common", 300).await.unwrap();
1192 assert_eq!(
1193 results.hits.len(),
1194 total_docs as usize,
1195 "all docs should match 'common'"
1196 );
1197
1198 for batch in 0..8 {
1200 let q = format!("unique_{}", batch);
1201 let results = index.query(&q, 100).await.unwrap();
1202 assert_eq!(results.hits.len(), 25, "'{}' should match 25 docs", q);
1203 }
1204
1205 let writer = IndexWriter::open(dir.clone(), config.clone())
1207 .await
1208 .unwrap();
1209 writer.force_merge().await.unwrap();
1210
1211 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1213 assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1214 assert_eq!(index.num_docs().await.unwrap(), total_docs);
1215
1216 let results = index.query("common", 300).await.unwrap();
1217 assert_eq!(results.hits.len(), total_docs as usize);
1218
1219 for batch in 0..8 {
1220 let q = format!("unique_{}", batch);
1221 let results = index.query(&q, 100).await.unwrap();
1222 assert_eq!(results.hits.len(), 25, "'{}' after merge", q);
1223 }
1224
1225 let reader = index.reader().await.unwrap();
1227 let searcher = reader.searcher().await.unwrap();
1228 for i in 0..total_docs {
1229 let doc = searcher.doc(i).await.unwrap();
1230 assert!(doc.is_some(), "doc {} missing after merge", i);
1231 }
1232 }
1233
1234 #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
1238 async fn test_auto_merge_triggered() {
1239 use crate::directories::MmapDirectory;
1240 let tmp_dir = tempfile::tempdir().unwrap();
1241 let dir = MmapDirectory::new(tmp_dir.path());
1242
1243 let mut schema_builder = SchemaBuilder::default();
1244 let title = schema_builder.add_text_field("title", true, true);
1245 let body = schema_builder.add_text_field("body", true, true);
1246 let schema = schema_builder.build();
1247
1248 let config = IndexConfig {
1250 max_indexing_memory_bytes: 4096,
1251 num_indexing_threads: 4,
1252 merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1253 ..Default::default()
1254 };
1255
1256 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1257 .await
1258 .unwrap();
1259
1260 for batch in 0..12 {
1262 for i in 0..50 {
1263 let mut doc = Document::new();
1264 doc.add_text(title, format!("document_{} batch_{} alpha bravo", i, batch));
1265 doc.add_text(
1266 body,
1267 format!(
1268 "the quick brown fox jumps over lazy dog number {} round {}",
1269 i, batch
1270 ),
1271 );
1272 writer.add_document(doc).unwrap();
1273 }
1274 writer.flush().await.unwrap();
1275 }
1276 writer.commit().await.unwrap();
1277
1278 let pre_merge = writer.segment_manager.get_segment_ids().await.len();
1279
1280 writer.wait_for_merges().await;
1282
1283 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1285 let segment_count = index.segment_readers().await.unwrap().len();
1286 eprintln!(
1287 "Segments: {} before merge, {} after auto-merge",
1288 pre_merge, segment_count
1289 );
1290 assert!(
1291 segment_count < pre_merge,
1292 "Expected auto-merge to reduce segments from {}, got {}",
1293 pre_merge,
1294 segment_count
1295 );
1296 }
1297
1298 #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
1303 async fn test_commit_with_vectors_and_background_merge() {
1304 use crate::directories::MmapDirectory;
1305 use crate::dsl::DenseVectorConfig;
1306
1307 let tmp_dir = tempfile::tempdir().unwrap();
1308 let dir = MmapDirectory::new(tmp_dir.path());
1309
1310 let mut schema_builder = SchemaBuilder::default();
1311 let title = schema_builder.add_text_field("title", true, true);
1312 let vec_config = DenseVectorConfig::new(8).with_build_threshold(10);
1314 let embedding =
1315 schema_builder.add_dense_vector_field_with_config("embedding", true, true, vec_config);
1316 let schema = schema_builder.build();
1317
1318 let config = IndexConfig {
1320 max_indexing_memory_bytes: 4096,
1321 num_indexing_threads: 4,
1322 merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1323 ..Default::default()
1324 };
1325
1326 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1327 .await
1328 .unwrap();
1329
1330 for batch in 0..12 {
1333 for i in 0..5 {
1334 let mut doc = Document::new();
1335 doc.add_text(title, format!("doc_{}_batch_{}", i, batch));
1336 let vec: Vec<f32> = (0..8).map(|j| (i * 8 + j + batch) as f32 * 0.1).collect();
1338 doc.add_dense_vector(embedding, vec);
1339 writer.add_document(doc).unwrap();
1340 }
1341 writer.flush().await.unwrap();
1342 }
1343
1344 writer.commit().await.unwrap();
1349 writer.wait_for_merges().await;
1350
1351 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1352 let num_docs = index.num_docs().await.unwrap();
1353 assert_eq!(num_docs, 60, "Expected 60 docs, got {}", num_docs);
1354 }
1355
1356 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1359 async fn test_force_merge_many_segments() {
1360 use crate::directories::MmapDirectory;
1361 let tmp_dir = tempfile::tempdir().unwrap();
1362 let dir = MmapDirectory::new(tmp_dir.path());
1363
1364 let mut schema_builder = SchemaBuilder::default();
1365 let title = schema_builder.add_text_field("title", true, true);
1366 let schema = schema_builder.build();
1367
1368 let config = IndexConfig {
1369 max_indexing_memory_bytes: 512,
1370 ..Default::default()
1371 };
1372
1373 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1374 .await
1375 .unwrap();
1376
1377 for batch in 0..50 {
1379 for i in 0..3 {
1380 let mut doc = Document::new();
1381 doc.add_text(title, format!("term_{} batch_{}", i, batch));
1382 writer.add_document(doc).unwrap();
1383 }
1384 writer.flush().await.unwrap();
1385 }
1386 writer.commit().await.unwrap();
1387 writer.wait_for_merges().await;
1389
1390 let seg_ids = writer.segment_manager.get_segment_ids().await;
1391 let pre = seg_ids.len();
1392 eprintln!("Segments before force_merge: {}", pre);
1393 assert!(pre >= 2, "Expected multiple segments, got {}", pre);
1394
1395 writer.force_merge().await.unwrap();
1397
1398 let index2 = Index::open(dir, config).await.unwrap();
1399 let post = index2.segment_readers().await.unwrap().len();
1400 eprintln!("Segments after force_merge: {}", post);
1401 assert_eq!(post, 1);
1402 assert_eq!(index2.num_docs().await.unwrap(), 150);
1403 }
1404}