1#[cfg(feature = "native")]
11use crate::dsl::Schema;
12#[cfg(feature = "native")]
13use crate::error::Result;
14#[cfg(feature = "native")]
15use std::sync::Arc;
16
17mod searcher;
18pub use searcher::Searcher;
19
20#[cfg(feature = "native")]
21mod reader;
22#[cfg(feature = "native")]
23mod vector_builder;
24#[cfg(feature = "native")]
25mod writer;
26#[cfg(feature = "native")]
27pub use reader::IndexReader;
28#[cfg(feature = "native")]
29pub use writer::{IndexWriter, PreparedCommit};
30
31mod metadata;
32pub use metadata::{FieldVectorMeta, INDEX_META_FILENAME, IndexMetadata, VectorIndexState};
33
34#[cfg(feature = "native")]
35mod helpers;
36#[cfg(feature = "native")]
37pub use helpers::{
38 IndexingStats, SchemaConfig, SchemaFieldConfig, create_index_at_path, create_index_from_sdl,
39 index_documents_from_reader, index_json_document, parse_schema,
40};
41
42pub const SLICE_CACHE_FILENAME: &str = "index.slicecache";
44
45#[derive(Debug, Clone)]
47pub struct IndexConfig {
48 pub num_threads: usize,
50 pub num_indexing_threads: usize,
52 pub num_compression_threads: usize,
54 pub term_cache_blocks: usize,
56 pub store_cache_blocks: usize,
58 pub max_indexing_memory_bytes: usize,
60 pub merge_policy: Box<dyn crate::merge::MergePolicy>,
62 pub optimization: crate::structures::IndexOptimization,
64 pub reload_interval_ms: u64,
66 pub max_concurrent_merges: usize,
68}
69
70impl Default for IndexConfig {
71 fn default() -> Self {
72 #[cfg(feature = "native")]
73 let indexing_threads = crate::default_indexing_threads();
74 #[cfg(not(feature = "native"))]
75 let indexing_threads = 1;
76
77 #[cfg(feature = "native")]
78 let compression_threads = crate::default_compression_threads();
79 #[cfg(not(feature = "native"))]
80 let compression_threads = 1;
81
82 Self {
83 num_threads: indexing_threads,
84 num_indexing_threads: 1, num_compression_threads: compression_threads,
86 term_cache_blocks: 256,
87 store_cache_blocks: 32,
88 max_indexing_memory_bytes: 256 * 1024 * 1024, merge_policy: Box::new(crate::merge::TieredMergePolicy::default()),
90 optimization: crate::structures::IndexOptimization::default(),
91 reload_interval_ms: 1000, max_concurrent_merges: 4,
93 }
94 }
95}
96
97#[cfg(feature = "native")]
106pub struct Index<D: crate::directories::DirectoryWriter + 'static> {
107 directory: Arc<D>,
108 schema: Arc<Schema>,
109 config: IndexConfig,
110 segment_manager: Arc<crate::merge::SegmentManager<D>>,
112 cached_reader: tokio::sync::OnceCell<IndexReader<D>>,
114}
115
116#[cfg(feature = "native")]
117impl<D: crate::directories::DirectoryWriter + 'static> Index<D> {
118 pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
120 let directory = Arc::new(directory);
121 let schema = Arc::new(schema);
122 let metadata = IndexMetadata::new((*schema).clone());
123
124 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
125 Arc::clone(&directory),
126 Arc::clone(&schema),
127 metadata,
128 config.merge_policy.clone_box(),
129 config.term_cache_blocks,
130 config.max_concurrent_merges,
131 ));
132
133 segment_manager.update_metadata(|_| {}).await?;
135
136 Ok(Self {
137 directory,
138 schema,
139 config,
140 segment_manager,
141 cached_reader: tokio::sync::OnceCell::new(),
142 })
143 }
144
145 pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
147 let directory = Arc::new(directory);
148
149 let metadata = IndexMetadata::load(directory.as_ref()).await?;
151 let schema = Arc::new(metadata.schema.clone());
152
153 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
154 Arc::clone(&directory),
155 Arc::clone(&schema),
156 metadata,
157 config.merge_policy.clone_box(),
158 config.term_cache_blocks,
159 config.max_concurrent_merges,
160 ));
161
162 segment_manager.load_and_publish_trained().await;
164
165 Ok(Self {
166 directory,
167 schema,
168 config,
169 segment_manager,
170 cached_reader: tokio::sync::OnceCell::new(),
171 })
172 }
173
174 pub fn schema(&self) -> &Schema {
176 &self.schema
177 }
178
179 pub fn schema_arc(&self) -> &Arc<Schema> {
181 &self.schema
182 }
183
184 pub fn directory(&self) -> &D {
186 &self.directory
187 }
188
189 pub fn segment_manager(&self) -> &Arc<crate::merge::SegmentManager<D>> {
191 &self.segment_manager
192 }
193
194 pub async fn reader(&self) -> Result<&IndexReader<D>> {
199 self.cached_reader
200 .get_or_try_init(|| async {
201 IndexReader::from_segment_manager(
202 Arc::clone(&self.schema),
203 Arc::clone(&self.segment_manager),
204 self.config.term_cache_blocks,
205 self.config.reload_interval_ms,
206 )
207 .await
208 })
209 .await
210 }
211
212 pub fn config(&self) -> &IndexConfig {
214 &self.config
215 }
216
217 pub async fn segment_readers(&self) -> Result<Vec<Arc<crate::segment::SegmentReader>>> {
219 let reader = self.reader().await?;
220 let searcher = reader.searcher().await?;
221 Ok(searcher.segment_readers().to_vec())
222 }
223
224 pub async fn num_docs(&self) -> Result<u32> {
226 let reader = self.reader().await?;
227 let searcher = reader.searcher().await?;
228 Ok(searcher.num_docs())
229 }
230
231 pub fn default_fields(&self) -> Vec<crate::Field> {
233 if !self.schema.default_fields().is_empty() {
234 self.schema.default_fields().to_vec()
235 } else {
236 self.schema
237 .fields()
238 .filter(|(_, entry)| {
239 entry.indexed && entry.field_type == crate::dsl::FieldType::Text
240 })
241 .map(|(field, _)| field)
242 .collect()
243 }
244 }
245
246 pub fn tokenizers(&self) -> Arc<crate::tokenizer::TokenizerRegistry> {
248 Arc::new(crate::tokenizer::TokenizerRegistry::default())
249 }
250
251 pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
253 let default_fields = self.default_fields();
254 let tokenizers = self.tokenizers();
255
256 let query_routers = self.schema.query_routers();
257 if !query_routers.is_empty()
258 && let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers)
259 {
260 return crate::dsl::QueryLanguageParser::with_router(
261 Arc::clone(&self.schema),
262 default_fields,
263 tokenizers,
264 router,
265 );
266 }
267
268 crate::dsl::QueryLanguageParser::new(Arc::clone(&self.schema), default_fields, tokenizers)
269 }
270
271 pub async fn query(
273 &self,
274 query_str: &str,
275 limit: usize,
276 ) -> Result<crate::query::SearchResponse> {
277 self.query_offset(query_str, limit, 0).await
278 }
279
280 pub async fn query_offset(
282 &self,
283 query_str: &str,
284 limit: usize,
285 offset: usize,
286 ) -> Result<crate::query::SearchResponse> {
287 let parser = self.query_parser();
288 let query = parser
289 .parse(query_str)
290 .map_err(crate::error::Error::Query)?;
291 self.search_offset(query.as_ref(), limit, offset).await
292 }
293
294 pub async fn search(
296 &self,
297 query: &dyn crate::query::Query,
298 limit: usize,
299 ) -> Result<crate::query::SearchResponse> {
300 self.search_offset(query, limit, 0).await
301 }
302
303 pub async fn search_offset(
305 &self,
306 query: &dyn crate::query::Query,
307 limit: usize,
308 offset: usize,
309 ) -> Result<crate::query::SearchResponse> {
310 let reader = self.reader().await?;
311 let searcher = reader.searcher().await?;
312 let segments = searcher.segment_readers();
313
314 let fetch_limit = offset + limit;
315
316 let futures: Vec<_> = segments
317 .iter()
318 .map(|segment| {
319 let sid = segment.meta().id;
320 async move {
321 let results =
322 crate::query::search_segment(segment.as_ref(), query, fetch_limit).await?;
323 Ok::<_, crate::error::Error>(
324 results
325 .into_iter()
326 .map(move |r| (sid, r))
327 .collect::<Vec<_>>(),
328 )
329 }
330 })
331 .collect();
332
333 let batches = futures::future::try_join_all(futures).await?;
334 let mut all_results: Vec<(u128, crate::query::SearchResult)> =
335 Vec::with_capacity(batches.iter().map(|b| b.len()).sum());
336 for batch in batches {
337 all_results.extend(batch);
338 }
339
340 all_results.sort_by(|a, b| {
341 b.1.score
342 .partial_cmp(&a.1.score)
343 .unwrap_or(std::cmp::Ordering::Equal)
344 });
345
346 let total_hits = all_results.len() as u32;
347
348 let hits: Vec<crate::query::SearchHit> = all_results
349 .into_iter()
350 .skip(offset)
351 .take(limit)
352 .map(|(segment_id, result)| crate::query::SearchHit {
353 address: crate::query::DocAddress::new(segment_id, result.doc_id),
354 score: result.score,
355 matched_fields: result.extract_ordinals(),
356 })
357 .collect();
358
359 Ok(crate::query::SearchResponse { hits, total_hits })
360 }
361
362 pub async fn get_document(
364 &self,
365 address: &crate::query::DocAddress,
366 ) -> Result<Option<crate::dsl::Document>> {
367 let reader = self.reader().await?;
368 let searcher = reader.searcher().await?;
369 searcher.get_document(address).await
370 }
371
372 pub async fn get_postings(
374 &self,
375 field: crate::Field,
376 term: &[u8],
377 ) -> Result<
378 Vec<(
379 Arc<crate::segment::SegmentReader>,
380 crate::structures::BlockPostingList,
381 )>,
382 > {
383 let segments = self.segment_readers().await?;
384 let mut results = Vec::new();
385
386 for segment in segments {
387 if let Some(postings) = segment.get_postings(field, term).await? {
388 results.push((segment, postings));
389 }
390 }
391
392 Ok(results)
393 }
394}
395
396#[cfg(feature = "native")]
398impl<D: crate::directories::DirectoryWriter + 'static> Index<D> {
399 pub fn writer(&self) -> writer::IndexWriter<D> {
401 writer::IndexWriter::from_index(self)
402 }
403}
404
405#[cfg(test)]
406mod tests {
407 use super::*;
408 use crate::directories::RamDirectory;
409 use crate::dsl::{Document, SchemaBuilder};
410
411 #[tokio::test]
412 async fn test_index_create_and_search() {
413 let mut schema_builder = SchemaBuilder::default();
414 let title = schema_builder.add_text_field("title", true, true);
415 let body = schema_builder.add_text_field("body", true, true);
416 let schema = schema_builder.build();
417
418 let dir = RamDirectory::new();
419 let config = IndexConfig::default();
420
421 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
423 .await
424 .unwrap();
425
426 let mut doc1 = Document::new();
427 doc1.add_text(title, "Hello World");
428 doc1.add_text(body, "This is the first document");
429 writer.add_document(doc1).unwrap();
430
431 let mut doc2 = Document::new();
432 doc2.add_text(title, "Goodbye World");
433 doc2.add_text(body, "This is the second document");
434 writer.add_document(doc2).unwrap();
435
436 writer.commit().await.unwrap();
437
438 let index = Index::open(dir, config).await.unwrap();
440 assert_eq!(index.num_docs().await.unwrap(), 2);
441
442 let postings = index.get_postings(title, b"world").await.unwrap();
444 assert_eq!(postings.len(), 1); assert_eq!(postings[0].1.doc_count(), 2); let reader = index.reader().await.unwrap();
449 let searcher = reader.searcher().await.unwrap();
450 let doc = searcher.doc(0).await.unwrap().unwrap();
451 assert_eq!(doc.get_first(title).unwrap().as_text(), Some("Hello World"));
452 }
453
454 #[tokio::test]
455 async fn test_multiple_segments() {
456 let mut schema_builder = SchemaBuilder::default();
457 let title = schema_builder.add_text_field("title", true, true);
458 let schema = schema_builder.build();
459
460 let dir = RamDirectory::new();
461 let config = IndexConfig {
462 max_indexing_memory_bytes: 1024, ..Default::default()
464 };
465
466 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
467 .await
468 .unwrap();
469
470 for batch in 0..3 {
472 for i in 0..5 {
473 let mut doc = Document::new();
474 doc.add_text(title, format!("Document {} batch {}", i, batch));
475 writer.add_document(doc).unwrap();
476 }
477 writer.commit().await.unwrap();
478 }
479
480 let index = Index::open(dir, config).await.unwrap();
482 assert_eq!(index.num_docs().await.unwrap(), 15);
483 assert!(
485 index.segment_readers().await.unwrap().len() >= 2,
486 "Expected multiple segments"
487 );
488 }
489
490 #[tokio::test]
491 async fn test_segment_merge() {
492 let mut schema_builder = SchemaBuilder::default();
493 let title = schema_builder.add_text_field("title", true, true);
494 let schema = schema_builder.build();
495
496 let dir = RamDirectory::new();
497 let config = IndexConfig {
498 max_indexing_memory_bytes: 512, ..Default::default()
500 };
501
502 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
503 .await
504 .unwrap();
505
506 for batch in 0..3 {
508 for i in 0..3 {
509 let mut doc = Document::new();
510 doc.add_text(title, format!("Document {} batch {}", i, batch));
511 writer.add_document(doc).unwrap();
512 }
513 writer.commit().await.unwrap();
514 }
515
516 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
518 assert!(
519 index.segment_readers().await.unwrap().len() >= 2,
520 "Expected multiple segments"
521 );
522
523 let mut writer = IndexWriter::open(dir.clone(), config.clone())
525 .await
526 .unwrap();
527 writer.force_merge().await.unwrap();
528
529 let index = Index::open(dir, config).await.unwrap();
531 assert_eq!(index.segment_readers().await.unwrap().len(), 1);
532 assert_eq!(index.num_docs().await.unwrap(), 9);
533
534 let reader = index.reader().await.unwrap();
536 let searcher = reader.searcher().await.unwrap();
537 let mut found_docs = 0;
538 for i in 0..9 {
539 if searcher.doc(i).await.unwrap().is_some() {
540 found_docs += 1;
541 }
542 }
543 assert_eq!(found_docs, 9);
544 }
545
546 #[tokio::test]
547 async fn test_match_query() {
548 let mut schema_builder = SchemaBuilder::default();
549 let title = schema_builder.add_text_field("title", true, true);
550 let body = schema_builder.add_text_field("body", true, true);
551 let schema = schema_builder.build();
552
553 let dir = RamDirectory::new();
554 let config = IndexConfig::default();
555
556 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
557 .await
558 .unwrap();
559
560 let mut doc1 = Document::new();
561 doc1.add_text(title, "rust programming");
562 doc1.add_text(body, "Learn rust language");
563 writer.add_document(doc1).unwrap();
564
565 let mut doc2 = Document::new();
566 doc2.add_text(title, "python programming");
567 doc2.add_text(body, "Learn python language");
568 writer.add_document(doc2).unwrap();
569
570 writer.commit().await.unwrap();
571
572 let index = Index::open(dir, config).await.unwrap();
573
574 let results = index.query("rust", 10).await.unwrap();
576 assert_eq!(results.hits.len(), 1);
577
578 let results = index.query("rust programming", 10).await.unwrap();
580 assert!(!results.hits.is_empty());
581
582 let hit = &results.hits[0];
584 assert!(!hit.address.segment_id.is_empty(), "Should have segment_id");
585
586 let doc = index.get_document(&hit.address).await.unwrap().unwrap();
588 assert!(
589 !doc.field_values().is_empty(),
590 "Doc should have field values"
591 );
592
593 let reader = index.reader().await.unwrap();
595 let searcher = reader.searcher().await.unwrap();
596 let doc = searcher.doc(0).await.unwrap().unwrap();
597 assert!(
598 !doc.field_values().is_empty(),
599 "Doc should have field values"
600 );
601 }
602
603 #[tokio::test]
604 async fn test_slice_cache_warmup_and_load() {
605 use crate::directories::SliceCachingDirectory;
606
607 let mut schema_builder = SchemaBuilder::default();
608 let title = schema_builder.add_text_field("title", true, true);
609 let body = schema_builder.add_text_field("body", true, true);
610 let schema = schema_builder.build();
611
612 let dir = RamDirectory::new();
613 let config = IndexConfig::default();
614
615 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
617 .await
618 .unwrap();
619
620 for i in 0..10 {
621 let mut doc = Document::new();
622 doc.add_text(title, format!("Document {} about rust", i));
623 doc.add_text(body, format!("This is body text number {}", i));
624 writer.add_document(doc).unwrap();
625 }
626 writer.commit().await.unwrap();
627
628 let caching_dir = SliceCachingDirectory::new(dir.clone(), 1024 * 1024);
630 let index = Index::open(caching_dir, config.clone()).await.unwrap();
631
632 let results = index.query("rust", 10).await.unwrap();
634 assert!(!results.hits.is_empty());
635
636 let stats = index.directory.stats();
638 assert!(stats.total_bytes > 0, "Cache should have data after search");
639 }
640
641 #[tokio::test]
642 async fn test_multivalue_field_indexing_and_search() {
643 let mut schema_builder = SchemaBuilder::default();
644 let uris = schema_builder.add_text_field("uris", true, true);
645 let title = schema_builder.add_text_field("title", true, true);
646 let schema = schema_builder.build();
647
648 let dir = RamDirectory::new();
649 let config = IndexConfig::default();
650
651 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
653 .await
654 .unwrap();
655
656 let mut doc = Document::new();
657 doc.add_text(uris, "one");
658 doc.add_text(uris, "two");
659 doc.add_text(title, "Test Document");
660 writer.add_document(doc).unwrap();
661
662 let mut doc2 = Document::new();
664 doc2.add_text(uris, "three");
665 doc2.add_text(title, "Another Document");
666 writer.add_document(doc2).unwrap();
667
668 writer.commit().await.unwrap();
669
670 let index = Index::open(dir, config).await.unwrap();
672 assert_eq!(index.num_docs().await.unwrap(), 2);
673
674 let reader = index.reader().await.unwrap();
676 let searcher = reader.searcher().await.unwrap();
677 let doc = searcher.doc(0).await.unwrap().unwrap();
678 let all_uris: Vec<_> = doc.get_all(uris).collect();
679 assert_eq!(all_uris.len(), 2, "Should have 2 uris values");
680 assert_eq!(all_uris[0].as_text(), Some("one"));
681 assert_eq!(all_uris[1].as_text(), Some("two"));
682
683 let json = doc.to_json(index.schema());
685 let uris_json = json.get("uris").unwrap();
686 assert!(uris_json.is_array(), "Multi-value field should be an array");
687 let uris_arr = uris_json.as_array().unwrap();
688 assert_eq!(uris_arr.len(), 2);
689 assert_eq!(uris_arr[0].as_str(), Some("one"));
690 assert_eq!(uris_arr[1].as_str(), Some("two"));
691
692 let results = index.query("uris:one", 10).await.unwrap();
694 assert_eq!(results.hits.len(), 1, "Should find doc with 'one'");
695 assert_eq!(results.hits[0].address.doc_id, 0);
696
697 let results = index.query("uris:two", 10).await.unwrap();
698 assert_eq!(results.hits.len(), 1, "Should find doc with 'two'");
699 assert_eq!(results.hits[0].address.doc_id, 0);
700
701 let results = index.query("uris:three", 10).await.unwrap();
702 assert_eq!(results.hits.len(), 1, "Should find doc with 'three'");
703 assert_eq!(results.hits[0].address.doc_id, 1);
704
705 let results = index.query("uris:nonexistent", 10).await.unwrap();
707 assert_eq!(results.hits.len(), 0, "Should not find non-existent value");
708 }
709
710 #[tokio::test]
717 async fn test_wand_optimization_for_or_queries() {
718 use crate::query::{BooleanQuery, TermQuery};
719
720 let mut schema_builder = SchemaBuilder::default();
721 let content = schema_builder.add_text_field("content", true, true);
722 let schema = schema_builder.build();
723
724 let dir = RamDirectory::new();
725 let config = IndexConfig::default();
726
727 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
729 .await
730 .unwrap();
731
732 let mut doc = Document::new();
734 doc.add_text(content, "rust programming language is fast");
735 writer.add_document(doc).unwrap();
736
737 let mut doc = Document::new();
739 doc.add_text(content, "rust is a systems language");
740 writer.add_document(doc).unwrap();
741
742 let mut doc = Document::new();
744 doc.add_text(content, "programming is fun");
745 writer.add_document(doc).unwrap();
746
747 let mut doc = Document::new();
749 doc.add_text(content, "python is easy to learn");
750 writer.add_document(doc).unwrap();
751
752 let mut doc = Document::new();
754 doc.add_text(content, "rust rust programming programming systems");
755 writer.add_document(doc).unwrap();
756
757 writer.commit().await.unwrap();
758
759 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
761
762 let or_query = BooleanQuery::new()
764 .should(TermQuery::text(content, "rust"))
765 .should(TermQuery::text(content, "programming"));
766
767 let results = index.search(&or_query, 10).await.unwrap();
768
769 assert_eq!(results.hits.len(), 4, "Should find exactly 4 documents");
771
772 let doc_ids: Vec<u32> = results.hits.iter().map(|h| h.address.doc_id).collect();
773 assert!(doc_ids.contains(&0), "Should find doc 0");
774 assert!(doc_ids.contains(&1), "Should find doc 1");
775 assert!(doc_ids.contains(&2), "Should find doc 2");
776 assert!(doc_ids.contains(&4), "Should find doc 4");
777 assert!(
778 !doc_ids.contains(&3),
779 "Should NOT find doc 3 (only has 'python')"
780 );
781
782 let single_query = BooleanQuery::new().should(TermQuery::text(content, "rust"));
784
785 let results = index.search(&single_query, 10).await.unwrap();
786 assert_eq!(results.hits.len(), 3, "Should find 3 documents with 'rust'");
787
788 let must_query = BooleanQuery::new()
790 .must(TermQuery::text(content, "rust"))
791 .should(TermQuery::text(content, "programming"));
792
793 let results = index.search(&must_query, 10).await.unwrap();
794 assert_eq!(results.hits.len(), 3, "Should find 3 documents with 'rust'");
796
797 let must_not_query = BooleanQuery::new()
799 .should(TermQuery::text(content, "rust"))
800 .should(TermQuery::text(content, "programming"))
801 .must_not(TermQuery::text(content, "systems"));
802
803 let results = index.search(&must_not_query, 10).await.unwrap();
804 let doc_ids: Vec<u32> = results.hits.iter().map(|h| h.address.doc_id).collect();
806 assert!(
807 !doc_ids.contains(&1),
808 "Should NOT find doc 1 (has 'systems')"
809 );
810 assert!(
811 !doc_ids.contains(&4),
812 "Should NOT find doc 4 (has 'systems')"
813 );
814
815 let or_query = BooleanQuery::new()
817 .should(TermQuery::text(content, "rust"))
818 .should(TermQuery::text(content, "programming"));
819
820 let results = index.search(&or_query, 2).await.unwrap();
821 assert_eq!(results.hits.len(), 2, "Should return only top 2 results");
822
823 }
826
827 #[tokio::test]
829 async fn test_wand_results_match_standard_boolean() {
830 use crate::query::{BooleanQuery, TermQuery, WandOrQuery};
831
832 let mut schema_builder = SchemaBuilder::default();
833 let content = schema_builder.add_text_field("content", true, true);
834 let schema = schema_builder.build();
835
836 let dir = RamDirectory::new();
837 let config = IndexConfig::default();
838
839 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
840 .await
841 .unwrap();
842
843 for i in 0..10 {
845 let mut doc = Document::new();
846 let text = match i % 4 {
847 0 => "apple banana cherry",
848 1 => "apple orange",
849 2 => "banana grape",
850 _ => "cherry date",
851 };
852 doc.add_text(content, text);
853 writer.add_document(doc).unwrap();
854 }
855
856 writer.commit().await.unwrap();
857 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
858
859 let wand_query = WandOrQuery::new(content).term("apple").term("banana");
861
862 let bool_query = BooleanQuery::new()
863 .should(TermQuery::text(content, "apple"))
864 .should(TermQuery::text(content, "banana"));
865
866 let wand_results = index.search(&wand_query, 10).await.unwrap();
867 let bool_results = index.search(&bool_query, 10).await.unwrap();
868
869 assert_eq!(
871 wand_results.hits.len(),
872 bool_results.hits.len(),
873 "WAND and Boolean should find same number of docs"
874 );
875
876 let wand_docs: std::collections::HashSet<u32> =
877 wand_results.hits.iter().map(|h| h.address.doc_id).collect();
878 let bool_docs: std::collections::HashSet<u32> =
879 bool_results.hits.iter().map(|h| h.address.doc_id).collect();
880
881 assert_eq!(
882 wand_docs, bool_docs,
883 "WAND and Boolean should find same documents"
884 );
885 }
886
887 #[tokio::test]
888 async fn test_vector_index_threshold_switch() {
889 use crate::dsl::{DenseVectorConfig, DenseVectorQuantization, VectorIndexType};
890
891 let mut schema_builder = SchemaBuilder::default();
893 let title = schema_builder.add_text_field("title", true, true);
894 let embedding = schema_builder.add_dense_vector_field_with_config(
895 "embedding",
896 true, true, DenseVectorConfig {
899 dim: 8,
900 index_type: VectorIndexType::IvfRaBitQ,
901 quantization: DenseVectorQuantization::F32,
902 num_clusters: Some(4), nprobe: 2,
904 build_threshold: Some(50), unit_norm: false,
906 },
907 );
908 let schema = schema_builder.build();
909
910 let dir = RamDirectory::new();
911 let config = IndexConfig::default();
912
913 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
915 .await
916 .unwrap();
917
918 for i in 0..30 {
920 let mut doc = Document::new();
921 doc.add_text(title, format!("Document {}", i));
922 let vec: Vec<f32> = (0..8).map(|_| (i as f32) / 30.0).collect();
924 doc.add_dense_vector(embedding, vec);
925 writer.add_document(doc).unwrap();
926 }
927 writer.commit().await.unwrap();
928
929 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
931 assert!(
932 index.segment_manager.trained().is_none(),
933 "Should not have trained centroids below threshold"
934 );
935
936 let query_vec: Vec<f32> = vec![0.5; 8];
938 let segments = index.segment_readers().await.unwrap();
939 assert!(!segments.is_empty());
940
941 let results = segments[0]
942 .search_dense_vector(
943 embedding,
944 &query_vec,
945 5,
946 0,
947 1,
948 crate::query::MultiValueCombiner::Max,
949 )
950 .await
951 .unwrap();
952 assert!(!results.is_empty(), "Flat search should return results");
953
954 let mut writer = IndexWriter::open(dir.clone(), config.clone())
956 .await
957 .unwrap();
958
959 for i in 30..60 {
961 let mut doc = Document::new();
962 doc.add_text(title, format!("Document {}", i));
963 let vec: Vec<f32> = (0..8).map(|_| (i as f32) / 60.0).collect();
964 doc.add_dense_vector(embedding, vec);
965 writer.add_document(doc).unwrap();
966 }
967 writer.commit().await.unwrap();
968
969 writer.build_vector_index().await.unwrap();
971
972 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
974 assert!(
975 index.segment_manager.trained().is_some(),
976 "Should have loaded trained centroids for embedding field"
977 );
978
979 let segments = index.segment_readers().await.unwrap();
981 let results = segments[0]
982 .search_dense_vector(
983 embedding,
984 &query_vec,
985 5,
986 0,
987 1,
988 crate::query::MultiValueCombiner::Max,
989 )
990 .await
991 .unwrap();
992 assert!(
993 !results.is_empty(),
994 "Search should return results after build"
995 );
996
997 let writer = IndexWriter::open(dir.clone(), config.clone())
999 .await
1000 .unwrap();
1001 writer.build_vector_index().await.unwrap(); assert!(writer.segment_manager.trained().is_some());
1005 }
1006
1007 #[tokio::test]
1010 async fn test_multi_round_merge_with_search() {
1011 let mut schema_builder = SchemaBuilder::default();
1012 let title = schema_builder.add_text_field("title", true, true);
1013 let body = schema_builder.add_text_field("body", true, true);
1014 let schema = schema_builder.build();
1015
1016 let dir = RamDirectory::new();
1017 let config = IndexConfig {
1018 max_indexing_memory_bytes: 512,
1019 ..Default::default()
1020 };
1021
1022 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1024 .await
1025 .unwrap();
1026
1027 for batch in 0..5 {
1028 for i in 0..10 {
1029 let mut doc = Document::new();
1030 doc.add_text(
1031 title,
1032 format!("alpha bravo charlie batch{} doc{}", batch, i),
1033 );
1034 doc.add_text(
1035 body,
1036 format!("the quick brown fox jumps over the lazy dog number {}", i),
1037 );
1038 writer.add_document(doc).unwrap();
1039 }
1040 writer.commit().await.unwrap();
1041 }
1042
1043 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1044 let pre_merge_segments = index.segment_readers().await.unwrap().len();
1045 assert!(
1046 pre_merge_segments >= 3,
1047 "Expected >=3 segments, got {}",
1048 pre_merge_segments
1049 );
1050 assert_eq!(index.num_docs().await.unwrap(), 50);
1051
1052 let results = index.query("alpha", 100).await.unwrap();
1054 assert_eq!(results.hits.len(), 50, "all 50 docs should match 'alpha'");
1055
1056 let results = index.query("fox", 100).await.unwrap();
1057 assert_eq!(results.hits.len(), 50, "all 50 docs should match 'fox'");
1058
1059 let mut writer = IndexWriter::open(dir.clone(), config.clone())
1061 .await
1062 .unwrap();
1063 writer.force_merge().await.unwrap();
1064
1065 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1066 assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1067 assert_eq!(index.num_docs().await.unwrap(), 50);
1068
1069 let results = index.query("alpha", 100).await.unwrap();
1071 assert_eq!(
1072 results.hits.len(),
1073 50,
1074 "all 50 docs should match 'alpha' after merge 1"
1075 );
1076
1077 let results = index.query("fox", 100).await.unwrap();
1078 assert_eq!(
1079 results.hits.len(),
1080 50,
1081 "all 50 docs should match 'fox' after merge 1"
1082 );
1083
1084 let reader1 = index.reader().await.unwrap();
1086 let searcher1 = reader1.searcher().await.unwrap();
1087 for i in 0..50 {
1088 let doc = searcher1.doc(i).await.unwrap();
1089 assert!(doc.is_some(), "doc {} should exist after merge 1", i);
1090 }
1091
1092 let mut writer = IndexWriter::open(dir.clone(), config.clone())
1094 .await
1095 .unwrap();
1096 for batch in 0..3 {
1097 for i in 0..10 {
1098 let mut doc = Document::new();
1099 doc.add_text(
1100 title,
1101 format!("delta echo foxtrot round2_batch{} doc{}", batch, i),
1102 );
1103 doc.add_text(
1104 body,
1105 format!("the quick brown fox jumps again number {}", i),
1106 );
1107 writer.add_document(doc).unwrap();
1108 }
1109 writer.commit().await.unwrap();
1110 }
1111
1112 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1113 assert_eq!(index.num_docs().await.unwrap(), 80);
1114 assert!(
1115 index.segment_readers().await.unwrap().len() >= 2,
1116 "Should have >=2 segments after round 2 ingestion"
1117 );
1118
1119 let results = index.query("fox", 100).await.unwrap();
1121 assert_eq!(results.hits.len(), 80, "all 80 docs should match 'fox'");
1122
1123 let results = index.query("alpha", 100).await.unwrap();
1124 assert_eq!(results.hits.len(), 50, "only round 1 docs match 'alpha'");
1125
1126 let results = index.query("delta", 100).await.unwrap();
1127 assert_eq!(results.hits.len(), 30, "only round 2 docs match 'delta'");
1128
1129 let mut writer = IndexWriter::open(dir.clone(), config.clone())
1131 .await
1132 .unwrap();
1133 writer.force_merge().await.unwrap();
1134
1135 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1136 assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1137 assert_eq!(index.num_docs().await.unwrap(), 80);
1138
1139 let results = index.query("fox", 100).await.unwrap();
1141 assert_eq!(results.hits.len(), 80, "all 80 docs after merge 2");
1142
1143 let results = index.query("alpha", 100).await.unwrap();
1144 assert_eq!(results.hits.len(), 50, "round 1 docs after merge 2");
1145
1146 let results = index.query("delta", 100).await.unwrap();
1147 assert_eq!(results.hits.len(), 30, "round 2 docs after merge 2");
1148
1149 let reader2 = index.reader().await.unwrap();
1151 let searcher2 = reader2.searcher().await.unwrap();
1152 for i in 0..80 {
1153 let doc = searcher2.doc(i).await.unwrap();
1154 assert!(doc.is_some(), "doc {} should exist after merge 2", i);
1155 }
1156 }
1157
1158 #[tokio::test]
1161 async fn test_large_scale_merge_correctness() {
1162 let mut schema_builder = SchemaBuilder::default();
1163 let title = schema_builder.add_text_field("title", true, true);
1164 let schema = schema_builder.build();
1165
1166 let dir = RamDirectory::new();
1167 let config = IndexConfig {
1168 max_indexing_memory_bytes: 512,
1169 ..Default::default()
1170 };
1171
1172 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1173 .await
1174 .unwrap();
1175
1176 let total_docs = 200u32;
1179 for batch in 0..8 {
1180 for i in 0..25 {
1181 let mut doc = Document::new();
1182 doc.add_text(
1183 title,
1184 format!("common shared term unique_{} item{}", batch, i),
1185 );
1186 writer.add_document(doc).unwrap();
1187 }
1188 writer.commit().await.unwrap();
1189 }
1190
1191 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1193 assert_eq!(index.num_docs().await.unwrap(), total_docs);
1194
1195 let results = index.query("common", 300).await.unwrap();
1196 assert_eq!(
1197 results.hits.len(),
1198 total_docs as usize,
1199 "all docs should match 'common'"
1200 );
1201
1202 for batch in 0..8 {
1204 let q = format!("unique_{}", batch);
1205 let results = index.query(&q, 100).await.unwrap();
1206 assert_eq!(results.hits.len(), 25, "'{}' should match 25 docs", q);
1207 }
1208
1209 let mut writer = IndexWriter::open(dir.clone(), config.clone())
1211 .await
1212 .unwrap();
1213 writer.force_merge().await.unwrap();
1214
1215 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1217 assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1218 assert_eq!(index.num_docs().await.unwrap(), total_docs);
1219
1220 let results = index.query("common", 300).await.unwrap();
1221 assert_eq!(results.hits.len(), total_docs as usize);
1222
1223 for batch in 0..8 {
1224 let q = format!("unique_{}", batch);
1225 let results = index.query(&q, 100).await.unwrap();
1226 assert_eq!(results.hits.len(), 25, "'{}' after merge", q);
1227 }
1228
1229 let reader = index.reader().await.unwrap();
1231 let searcher = reader.searcher().await.unwrap();
1232 for i in 0..total_docs {
1233 let doc = searcher.doc(i).await.unwrap();
1234 assert!(doc.is_some(), "doc {} missing after merge", i);
1235 }
1236 }
1237
1238 #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
1242 async fn test_auto_merge_triggered() {
1243 use crate::directories::MmapDirectory;
1244 let tmp_dir = tempfile::tempdir().unwrap();
1245 let dir = MmapDirectory::new(tmp_dir.path());
1246
1247 let mut schema_builder = SchemaBuilder::default();
1248 let title = schema_builder.add_text_field("title", true, true);
1249 let body = schema_builder.add_text_field("body", true, true);
1250 let schema = schema_builder.build();
1251
1252 let config = IndexConfig {
1254 max_indexing_memory_bytes: 4096,
1255 num_indexing_threads: 4,
1256 merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1257 ..Default::default()
1258 };
1259
1260 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1261 .await
1262 .unwrap();
1263
1264 for batch in 0..12 {
1266 for i in 0..50 {
1267 let mut doc = Document::new();
1268 doc.add_text(title, format!("document_{} batch_{} alpha bravo", i, batch));
1269 doc.add_text(
1270 body,
1271 format!(
1272 "the quick brown fox jumps over lazy dog number {} round {}",
1273 i, batch
1274 ),
1275 );
1276 writer.add_document(doc).unwrap();
1277 }
1278 writer.commit().await.unwrap();
1279 }
1280
1281 let pre_merge = writer.segment_manager.get_segment_ids().await.len();
1282
1283 writer.wait_for_merging_thread().await;
1286 writer.maybe_merge().await;
1287 writer.wait_for_merging_thread().await;
1288
1289 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1291 let segment_count = index.segment_readers().await.unwrap().len();
1292 eprintln!(
1293 "Segments: {} before merge, {} after auto-merge",
1294 pre_merge, segment_count
1295 );
1296 assert!(
1297 segment_count < pre_merge,
1298 "Expected auto-merge to reduce segments from {}, got {}",
1299 pre_merge,
1300 segment_count
1301 );
1302 }
1303
1304 #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
1309 async fn test_commit_with_vectors_and_background_merge() {
1310 use crate::directories::MmapDirectory;
1311 use crate::dsl::DenseVectorConfig;
1312
1313 let tmp_dir = tempfile::tempdir().unwrap();
1314 let dir = MmapDirectory::new(tmp_dir.path());
1315
1316 let mut schema_builder = SchemaBuilder::default();
1317 let title = schema_builder.add_text_field("title", true, true);
1318 let vec_config = DenseVectorConfig::new(8).with_build_threshold(10);
1320 let embedding =
1321 schema_builder.add_dense_vector_field_with_config("embedding", true, true, vec_config);
1322 let schema = schema_builder.build();
1323
1324 let config = IndexConfig {
1326 max_indexing_memory_bytes: 4096,
1327 num_indexing_threads: 4,
1328 merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1329 ..Default::default()
1330 };
1331
1332 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1333 .await
1334 .unwrap();
1335
1336 for batch in 0..12 {
1339 for i in 0..5 {
1340 let mut doc = Document::new();
1341 doc.add_text(title, format!("doc_{}_batch_{}", i, batch));
1342 let vec: Vec<f32> = (0..8).map(|j| (i * 8 + j + batch) as f32 * 0.1).collect();
1344 doc.add_dense_vector(embedding, vec);
1345 writer.add_document(doc).unwrap();
1346 }
1347 writer.commit().await.unwrap();
1348 }
1349 writer.wait_for_merging_thread().await;
1350
1351 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1352 let num_docs = index.num_docs().await.unwrap();
1353 assert_eq!(num_docs, 60, "Expected 60 docs, got {}", num_docs);
1354 }
1355
1356 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1359 async fn test_force_merge_many_segments() {
1360 use crate::directories::MmapDirectory;
1361 let tmp_dir = tempfile::tempdir().unwrap();
1362 let dir = MmapDirectory::new(tmp_dir.path());
1363
1364 let mut schema_builder = SchemaBuilder::default();
1365 let title = schema_builder.add_text_field("title", true, true);
1366 let schema = schema_builder.build();
1367
1368 let config = IndexConfig {
1369 max_indexing_memory_bytes: 512,
1370 ..Default::default()
1371 };
1372
1373 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1374 .await
1375 .unwrap();
1376
1377 for batch in 0..50 {
1379 for i in 0..3 {
1380 let mut doc = Document::new();
1381 doc.add_text(title, format!("term_{} batch_{}", i, batch));
1382 writer.add_document(doc).unwrap();
1383 }
1384 writer.commit().await.unwrap();
1385 }
1386 writer.wait_for_merging_thread().await;
1388
1389 let seg_ids = writer.segment_manager.get_segment_ids().await;
1390 let pre = seg_ids.len();
1391 eprintln!("Segments before force_merge: {}", pre);
1392 assert!(pre >= 2, "Expected multiple segments, got {}", pre);
1393
1394 writer.force_merge().await.unwrap();
1396
1397 let index2 = Index::open(dir, config).await.unwrap();
1398 let post = index2.segment_readers().await.unwrap().len();
1399 eprintln!("Segments after force_merge: {}", post);
1400 assert_eq!(post, 1);
1401 assert_eq!(index2.num_docs().await.unwrap(), 150);
1402 }
1403
1404 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1408 async fn test_background_merge_generation() {
1409 use crate::directories::MmapDirectory;
1410 let tmp_dir = tempfile::tempdir().unwrap();
1411 let dir = MmapDirectory::new(tmp_dir.path());
1412
1413 let mut schema_builder = SchemaBuilder::default();
1414 let title = schema_builder.add_text_field("title", true, true);
1415 let schema = schema_builder.build();
1416
1417 let config = IndexConfig {
1418 max_indexing_memory_bytes: 4096,
1419 num_indexing_threads: 2,
1420 merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1421 ..Default::default()
1422 };
1423
1424 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1425 .await
1426 .unwrap();
1427
1428 for batch in 0..15 {
1430 for i in 0..5 {
1431 let mut doc = Document::new();
1432 doc.add_text(title, format!("doc_{}_batch_{}", i, batch));
1433 writer.add_document(doc).unwrap();
1434 }
1435 writer.commit().await.unwrap();
1436 }
1437 writer.wait_for_merging_thread().await;
1438
1439 let metas = writer
1441 .segment_manager
1442 .read_metadata(|m| m.segment_metas.clone())
1443 .await;
1444
1445 let max_gen = metas.values().map(|m| m.generation).max().unwrap_or(0);
1446 eprintln!(
1447 "Segments after merge: {}, max generation: {}",
1448 metas.len(),
1449 max_gen
1450 );
1451
1452 assert!(
1454 max_gen >= 1,
1455 "Expected at least one merged segment (gen >= 1), got max_gen={}",
1456 max_gen
1457 );
1458
1459 for (id, info) in &metas {
1461 if info.generation > 0 {
1462 assert!(
1463 !info.ancestors.is_empty(),
1464 "Segment {} has gen={} but no ancestors",
1465 id,
1466 info.generation
1467 );
1468 } else {
1469 assert!(
1470 info.ancestors.is_empty(),
1471 "Fresh segment {} has gen=0 but has ancestors",
1472 id
1473 );
1474 }
1475 }
1476 }
1477
1478 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1482 async fn test_merge_preserves_all_documents() {
1483 use crate::directories::MmapDirectory;
1484 let tmp_dir = tempfile::tempdir().unwrap();
1485 let dir = MmapDirectory::new(tmp_dir.path());
1486
1487 let mut schema_builder = SchemaBuilder::default();
1488 let title = schema_builder.add_text_field("title", true, true);
1489 let schema = schema_builder.build();
1490
1491 let config = IndexConfig {
1492 max_indexing_memory_bytes: 4096,
1493 ..Default::default()
1494 };
1495
1496 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1497 .await
1498 .unwrap();
1499
1500 let total_docs = 1200;
1501 let docs_per_batch = 60;
1502 let batches = total_docs / docs_per_batch;
1503
1504 for batch in 0..batches {
1506 for i in 0..docs_per_batch {
1507 let doc_num = batch * docs_per_batch + i;
1508 let mut doc = Document::new();
1509 doc.add_text(
1510 title,
1511 format!("uid_{} common_term batch_{}", doc_num, batch),
1512 );
1513 writer.add_document(doc).unwrap();
1514 }
1515 writer.commit().await.unwrap();
1516 }
1517
1518 let pre_segments = writer.segment_manager.get_segment_ids().await.len();
1519 assert!(
1520 pre_segments >= 2,
1521 "Need multiple segments, got {}",
1522 pre_segments
1523 );
1524
1525 writer.force_merge().await.unwrap();
1527
1528 let index = Index::open(dir, config).await.unwrap();
1529 assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1530 assert_eq!(
1531 index.num_docs().await.unwrap(),
1532 total_docs as u32,
1533 "Doc count mismatch after force_merge"
1534 );
1535
1536 let results = index.query("common_term", total_docs + 100).await.unwrap();
1538 assert_eq!(
1539 results.hits.len(),
1540 total_docs,
1541 "common_term should match all docs"
1542 );
1543
1544 for check in [0, 1, total_docs / 2, total_docs - 1] {
1546 let q = format!("uid_{}", check);
1547 let results = index.query(&q, 10).await.unwrap();
1548 assert_eq!(results.hits.len(), 1, "'{}' should match exactly 1 doc", q);
1549 }
1550 }
1551
1552 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1555 async fn test_multi_round_merge_doc_integrity() {
1556 use crate::directories::MmapDirectory;
1557 let tmp_dir = tempfile::tempdir().unwrap();
1558 let dir = MmapDirectory::new(tmp_dir.path());
1559
1560 let mut schema_builder = SchemaBuilder::default();
1561 let title = schema_builder.add_text_field("title", true, true);
1562 let schema = schema_builder.build();
1563
1564 let config = IndexConfig {
1565 max_indexing_memory_bytes: 4096,
1566 num_indexing_threads: 2,
1567 merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1568 ..Default::default()
1569 };
1570
1571 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1572 .await
1573 .unwrap();
1574
1575 let mut expected_total = 0u64;
1576
1577 for round in 0..4 {
1579 let docs_this_round = 50 + round * 25; for batch in 0..5 {
1581 for i in 0..docs_this_round / 5 {
1582 let mut doc = Document::new();
1583 doc.add_text(
1584 title,
1585 format!("round_{}_batch_{}_doc_{} searchable", round, batch, i),
1586 );
1587 writer.add_document(doc).unwrap();
1588 }
1589 writer.commit().await.unwrap();
1590 }
1591 writer.wait_for_merging_thread().await;
1592
1593 expected_total += docs_this_round as u64;
1594
1595 let actual = writer
1596 .segment_manager
1597 .read_metadata(|m| {
1598 m.segment_metas
1599 .values()
1600 .map(|s| s.num_docs as u64)
1601 .sum::<u64>()
1602 })
1603 .await;
1604
1605 assert_eq!(
1606 actual, expected_total,
1607 "Round {}: expected {} docs, metadata reports {}",
1608 round, expected_total, actual
1609 );
1610 }
1611
1612 let index = Index::open(dir, config).await.unwrap();
1614 assert_eq!(index.num_docs().await.unwrap(), expected_total as u32);
1615
1616 let results = index
1617 .query("searchable", expected_total as usize + 100)
1618 .await
1619 .unwrap();
1620 assert_eq!(
1621 results.hits.len(),
1622 expected_total as usize,
1623 "All docs should match 'searchable'"
1624 );
1625
1626 let metas = index
1628 .segment_manager()
1629 .read_metadata(|m| m.segment_metas.clone())
1630 .await;
1631 let max_gen = metas.values().map(|m| m.generation).max().unwrap_or(0);
1632 eprintln!(
1633 "Final: {} segments, {} docs, max generation={}",
1634 metas.len(),
1635 expected_total,
1636 max_gen
1637 );
1638 assert!(
1639 max_gen >= 1,
1640 "Multiple merge rounds should produce gen >= 1"
1641 );
1642 }
1643
1644 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1650 async fn test_segment_count_bounded_during_sustained_indexing() {
1651 use crate::directories::MmapDirectory;
1652 let tmp_dir = tempfile::tempdir().unwrap();
1653 let dir = MmapDirectory::new(tmp_dir.path());
1654
1655 let mut schema_builder = SchemaBuilder::default();
1656 let title = schema_builder.add_text_field("title", true, false);
1657 let schema = schema_builder.build();
1658
1659 let policy = crate::merge::TieredMergePolicy {
1660 segments_per_tier: 3,
1661 max_merge_at_once: 5,
1662 tier_factor: 10.0,
1663 tier_floor: 50,
1664 max_merged_docs: 1_000_000,
1665 };
1666
1667 let config = IndexConfig {
1668 max_indexing_memory_bytes: 4096, num_indexing_threads: 1,
1670 merge_policy: Box::new(policy),
1671 max_concurrent_merges: 4,
1672 ..Default::default()
1673 };
1674
1675 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1676 .await
1677 .unwrap();
1678
1679 let num_commits = 40;
1680 let docs_per_commit = 30;
1681 let total_docs = num_commits * docs_per_commit;
1682 let mut max_segments_seen = 0usize;
1683
1684 for commit_idx in 0..num_commits {
1685 for i in 0..docs_per_commit {
1686 let mut doc = Document::new();
1687 doc.add_text(
1688 title,
1689 format!("doc_{} text", commit_idx * docs_per_commit + i),
1690 );
1691 writer.add_document(doc).unwrap();
1692 }
1693 writer.commit().await.unwrap();
1694
1695 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1697
1698 let seg_count = writer.segment_manager.get_segment_ids().await.len();
1699 max_segments_seen = max_segments_seen.max(seg_count);
1700 }
1701
1702 writer.wait_for_all_merges().await;
1704
1705 let final_segments = writer.segment_manager.get_segment_ids().await.len();
1706 let final_docs: u64 = writer
1707 .segment_manager
1708 .read_metadata(|m| {
1709 m.segment_metas
1710 .values()
1711 .map(|s| s.num_docs as u64)
1712 .sum::<u64>()
1713 })
1714 .await;
1715
1716 eprintln!(
1717 "Sustained indexing: {} commits, {} total docs, final segments={}, max segments seen={}",
1718 num_commits, total_docs, final_segments, max_segments_seen
1719 );
1720
1721 let max_allowed = num_commits / 2; assert!(
1728 max_segments_seen <= max_allowed,
1729 "Segment count grew too fast: max seen {} > allowed {} (out of {} commits). \
1730 Merging is not keeping up.",
1731 max_segments_seen,
1732 max_allowed,
1733 num_commits
1734 );
1735
1736 assert!(
1738 final_segments <= 10,
1739 "After all merges, expected ≤10 segments, got {}",
1740 final_segments
1741 );
1742
1743 assert_eq!(
1745 final_docs, total_docs as u64,
1746 "Expected {} docs, metadata reports {}",
1747 total_docs, final_docs
1748 );
1749 }
1750
1751 #[tokio::test]
1758 async fn test_needle_fulltext_single_segment() {
1759 let mut sb = SchemaBuilder::default();
1760 let title = sb.add_text_field("title", true, true);
1761 let body = sb.add_text_field("body", true, true);
1762 let schema = sb.build();
1763
1764 let dir = RamDirectory::new();
1765 let config = IndexConfig::default();
1766 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1767 .await
1768 .unwrap();
1769
1770 for i in 0..100 {
1772 let mut doc = Document::new();
1773 doc.add_text(title, format!("Hay document number {}", i));
1774 doc.add_text(
1775 body,
1776 "common words repeated across all hay documents filler text",
1777 );
1778 writer.add_document(doc).unwrap();
1779 }
1780
1781 let mut needle = Document::new();
1783 needle.add_text(title, "The unique needle xylophone");
1784 needle.add_text(
1785 body,
1786 "This document contains the extraordinary term xylophone",
1787 );
1788 writer.add_document(needle).unwrap();
1791
1792 for i in 100..150 {
1794 let mut doc = Document::new();
1795 doc.add_text(title, format!("More hay document {}", i));
1796 doc.add_text(body, "common words filler text again and again");
1797 writer.add_document(doc).unwrap();
1798 }
1799
1800 writer.commit().await.unwrap();
1801
1802 let index = Index::open(dir, config).await.unwrap();
1803 assert_eq!(index.num_docs().await.unwrap(), 151);
1804
1805 let results = index.query("xylophone", 10).await.unwrap();
1807 assert_eq!(results.hits.len(), 1, "Should find exactly the needle");
1808 assert!(results.hits[0].score > 0.0, "Score should be positive");
1809
1810 let doc = index
1812 .get_document(&results.hits[0].address)
1813 .await
1814 .unwrap()
1815 .unwrap();
1816 let title_val = doc.get_first(title).unwrap().as_text().unwrap();
1817 assert!(
1818 title_val.contains("xylophone"),
1819 "Retrieved doc should be the needle"
1820 );
1821
1822 let results = index.query("common", 200).await.unwrap();
1824 assert!(
1825 results.hits.len() >= 100,
1826 "Common term should match many docs"
1827 );
1828
1829 let results = index.query("nonexistentterm99999", 10).await.unwrap();
1831 assert_eq!(
1832 results.hits.len(),
1833 0,
1834 "Non-existent term should match nothing"
1835 );
1836 }
1837
1838 #[tokio::test]
1840 async fn test_needle_fulltext_multi_segment() {
1841 use crate::query::TermQuery;
1842
1843 let mut sb = SchemaBuilder::default();
1844 let content = sb.add_text_field("content", true, true);
1845 let schema = sb.build();
1846
1847 let dir = RamDirectory::new();
1848 let config = IndexConfig::default();
1849 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1850 .await
1851 .unwrap();
1852
1853 for i in 0..50 {
1855 let mut doc = Document::new();
1856 doc.add_text(content, format!("segment one hay document {}", i));
1857 writer.add_document(doc).unwrap();
1858 }
1859 writer.commit().await.unwrap();
1860
1861 let mut needle = Document::new();
1863 needle.add_text(content, "the magnificent quetzalcoatl serpent deity");
1864 writer.add_document(needle).unwrap();
1865 for i in 0..49 {
1866 let mut doc = Document::new();
1867 doc.add_text(content, format!("segment two hay document {}", i));
1868 writer.add_document(doc).unwrap();
1869 }
1870 writer.commit().await.unwrap();
1871
1872 for i in 0..50 {
1874 let mut doc = Document::new();
1875 doc.add_text(content, format!("segment three hay document {}", i));
1876 writer.add_document(doc).unwrap();
1877 }
1878 writer.commit().await.unwrap();
1879
1880 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1881 assert_eq!(index.num_docs().await.unwrap(), 150);
1882 let num_segments = index.segment_readers().await.unwrap().len();
1883 assert!(
1884 num_segments >= 2,
1885 "Should have multiple segments, got {}",
1886 num_segments
1887 );
1888
1889 let results = index.query("quetzalcoatl", 10).await.unwrap();
1891 assert_eq!(
1892 results.hits.len(),
1893 1,
1894 "Should find exactly 1 needle across segments"
1895 );
1896
1897 let reader = index.reader().await.unwrap();
1899 let searcher = reader.searcher().await.unwrap();
1900 let tq = TermQuery::text(content, "quetzalcoatl");
1901 let results = searcher.search(&tq, 10).await.unwrap();
1902 assert_eq!(results.len(), 1, "TermQuery should also find the needle");
1903
1904 let doc = searcher.doc(results[0].doc_id).await.unwrap().unwrap();
1906 let text = doc.get_first(content).unwrap().as_text().unwrap();
1907 assert!(
1908 text.contains("quetzalcoatl"),
1909 "Should retrieve needle content"
1910 );
1911
1912 let results = index.query("document", 200).await.unwrap();
1914 assert!(
1915 results.hits.len() >= 149,
1916 "Should find hay docs across all segments"
1917 );
1918 }
1919
1920 #[tokio::test]
1922 async fn test_needle_sparse_vector() {
1923 use crate::query::SparseVectorQuery;
1924
1925 let mut sb = SchemaBuilder::default();
1926 let title = sb.add_text_field("title", true, true);
1927 let sparse = sb.add_sparse_vector_field("sparse", true, true);
1928 let schema = sb.build();
1929
1930 let dir = RamDirectory::new();
1931 let config = IndexConfig::default();
1932 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1933 .await
1934 .unwrap();
1935
1936 for i in 0..100 {
1938 let mut doc = Document::new();
1939 doc.add_text(title, format!("Hay sparse doc {}", i));
1940 let entries: Vec<(u32, f32)> = (0..10)
1942 .map(|d| (d, 0.1 + (i as f32 * 0.001) + (d as f32 * 0.01)))
1943 .collect();
1944 doc.add_sparse_vector(sparse, entries);
1945 writer.add_document(doc).unwrap();
1946 }
1947
1948 let mut needle = Document::new();
1950 needle.add_text(title, "Needle sparse document");
1951 needle.add_sparse_vector(
1952 sparse,
1953 vec![(1000, 0.9), (1001, 0.8), (1002, 0.7), (5, 0.3)],
1954 );
1955 writer.add_document(needle).unwrap();
1956
1957 for i in 100..150 {
1959 let mut doc = Document::new();
1960 doc.add_text(title, format!("More hay sparse doc {}", i));
1961 let entries: Vec<(u32, f32)> = (0..10).map(|d| (d, 0.2 + (d as f32 * 0.02))).collect();
1962 doc.add_sparse_vector(sparse, entries);
1963 writer.add_document(doc).unwrap();
1964 }
1965
1966 writer.commit().await.unwrap();
1967
1968 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1969 assert_eq!(index.num_docs().await.unwrap(), 151);
1970
1971 let reader = index.reader().await.unwrap();
1973 let searcher = reader.searcher().await.unwrap();
1974 let query = SparseVectorQuery::new(sparse, vec![(1000, 1.0), (1001, 1.0), (1002, 1.0)]);
1975 let results = searcher.search(&query, 10).await.unwrap();
1976 assert_eq!(results.len(), 1, "Only needle has dims 1000-1002");
1977 assert!(results[0].score > 0.0, "Needle score should be positive");
1978
1979 let doc = searcher.doc(results[0].doc_id).await.unwrap().unwrap();
1981 let title_val = doc.get_first(title).unwrap().as_text().unwrap();
1982 assert_eq!(title_val, "Needle sparse document");
1983
1984 let query_shared = SparseVectorQuery::new(sparse, vec![(5, 1.0)]);
1986 let results = searcher.search(&query_shared, 200).await.unwrap();
1987 assert!(
1988 results.len() >= 100,
1989 "Shared dim 5 should match many docs, got {}",
1990 results.len()
1991 );
1992
1993 let query_missing = SparseVectorQuery::new(sparse, vec![(99999, 1.0)]);
1995 let results = searcher.search(&query_missing, 10).await.unwrap();
1996 assert_eq!(
1997 results.len(),
1998 0,
1999 "Non-existent dimension should match nothing"
2000 );
2001 }
2002
2003 #[tokio::test]
2005 async fn test_needle_sparse_vector_multi_segment_merge() {
2006 use crate::query::SparseVectorQuery;
2007
2008 let mut sb = SchemaBuilder::default();
2009 let title = sb.add_text_field("title", true, true);
2010 let sparse = sb.add_sparse_vector_field("sparse", true, true);
2011 let schema = sb.build();
2012
2013 let dir = RamDirectory::new();
2014 let config = IndexConfig::default();
2015 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
2016 .await
2017 .unwrap();
2018
2019 for i in 0..30 {
2021 let mut doc = Document::new();
2022 doc.add_text(title, format!("seg1 hay {}", i));
2023 doc.add_sparse_vector(sparse, vec![(0, 0.5), (1, 0.3)]);
2024 writer.add_document(doc).unwrap();
2025 }
2026 writer.commit().await.unwrap();
2027
2028 let mut needle = Document::new();
2030 needle.add_text(title, "seg2 needle");
2031 needle.add_sparse_vector(sparse, vec![(500, 0.95), (501, 0.85)]);
2032 writer.add_document(needle).unwrap();
2033 for i in 0..29 {
2034 let mut doc = Document::new();
2035 doc.add_text(title, format!("seg2 hay {}", i));
2036 doc.add_sparse_vector(sparse, vec![(0, 0.4), (2, 0.6)]);
2037 writer.add_document(doc).unwrap();
2038 }
2039 writer.commit().await.unwrap();
2040
2041 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
2043 assert_eq!(index.num_docs().await.unwrap(), 60);
2044
2045 let reader = index.reader().await.unwrap();
2046 let searcher = reader.searcher().await.unwrap();
2047 let query = SparseVectorQuery::new(sparse, vec![(500, 1.0), (501, 1.0)]);
2048 let results = searcher.search(&query, 10).await.unwrap();
2049 assert_eq!(results.len(), 1, "Pre-merge: needle should be found");
2050 let doc = searcher.doc(results[0].doc_id).await.unwrap().unwrap();
2051 assert_eq!(
2052 doc.get_first(title).unwrap().as_text().unwrap(),
2053 "seg2 needle"
2054 );
2055
2056 let mut writer = IndexWriter::open(dir.clone(), config.clone())
2058 .await
2059 .unwrap();
2060 writer.force_merge().await.unwrap();
2061
2062 let index = Index::open(dir, config).await.unwrap();
2064 assert_eq!(index.segment_readers().await.unwrap().len(), 1);
2065 assert_eq!(index.num_docs().await.unwrap(), 60);
2066
2067 let reader = index.reader().await.unwrap();
2068 let searcher = reader.searcher().await.unwrap();
2069 let query = SparseVectorQuery::new(sparse, vec![(500, 1.0), (501, 1.0)]);
2070 let results = searcher.search(&query, 10).await.unwrap();
2071 assert_eq!(results.len(), 1, "Post-merge: needle should still be found");
2072 let doc = searcher.doc(results[0].doc_id).await.unwrap().unwrap();
2073 assert_eq!(
2074 doc.get_first(title).unwrap().as_text().unwrap(),
2075 "seg2 needle"
2076 );
2077 }
2078
2079 #[tokio::test]
2081 async fn test_needle_dense_vector_flat() {
2082 use crate::dsl::{DenseVectorConfig, VectorIndexType};
2083 use crate::query::DenseVectorQuery;
2084
2085 let dim = 16;
2086 let mut sb = SchemaBuilder::default();
2087 let title = sb.add_text_field("title", true, true);
2088 let embedding = sb.add_dense_vector_field_with_config(
2089 "embedding",
2090 true,
2091 true,
2092 DenseVectorConfig {
2093 dim,
2094 index_type: VectorIndexType::Flat,
2095 quantization: crate::dsl::DenseVectorQuantization::F32,
2096 num_clusters: None,
2097 nprobe: 0,
2098 build_threshold: None,
2099 unit_norm: false,
2100 },
2101 );
2102 let schema = sb.build();
2103
2104 let dir = RamDirectory::new();
2105 let config = IndexConfig::default();
2106 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
2107 .await
2108 .unwrap();
2109
2110 for i in 0..100 {
2112 let mut doc = Document::new();
2113 doc.add_text(title, format!("Hay dense doc {}", i));
2114 let vec: Vec<f32> = (0..dim)
2116 .map(|d| ((i * 7 + d * 13) % 100) as f32 / 1000.0)
2117 .collect();
2118 doc.add_dense_vector(embedding, vec);
2119 writer.add_document(doc).unwrap();
2120 }
2121
2122 let mut needle = Document::new();
2124 needle.add_text(title, "Needle dense document");
2125 let needle_vec: Vec<f32> = vec![1.0; dim];
2126 needle.add_dense_vector(embedding, needle_vec.clone());
2127 writer.add_document(needle).unwrap();
2128
2129 writer.commit().await.unwrap();
2130
2131 let index = Index::open(dir, config).await.unwrap();
2132 assert_eq!(index.num_docs().await.unwrap(), 101);
2133
2134 let reader = index.reader().await.unwrap();
2136 let searcher = reader.searcher().await.unwrap();
2137 let query = DenseVectorQuery::new(embedding, needle_vec);
2138 let results = searcher.search(&query, 5).await.unwrap();
2139 assert!(!results.is_empty(), "Should find at least 1 result");
2140
2141 let top_doc = searcher.doc(results[0].doc_id).await.unwrap().unwrap();
2143 let top_title = top_doc.get_first(title).unwrap().as_text().unwrap();
2144 assert_eq!(
2145 top_title, "Needle dense document",
2146 "Top result should be the needle (exact vector match)"
2147 );
2148 assert!(
2149 results[0].score > 0.9,
2150 "Exact match should have very high cosine similarity, got {}",
2151 results[0].score
2152 );
2153 }
2154
2155 #[tokio::test]
2158 async fn test_needle_combined_all_modalities() {
2159 use crate::dsl::{DenseVectorConfig, VectorIndexType};
2160 use crate::query::{DenseVectorQuery, SparseVectorQuery, TermQuery};
2161
2162 let dim = 8;
2163 let mut sb = SchemaBuilder::default();
2164 let title = sb.add_text_field("title", true, true);
2165 let body = sb.add_text_field("body", true, true);
2166 let sparse = sb.add_sparse_vector_field("sparse", true, true);
2167 let embedding = sb.add_dense_vector_field_with_config(
2168 "embedding",
2169 true,
2170 true,
2171 DenseVectorConfig {
2172 dim,
2173 index_type: VectorIndexType::Flat,
2174 quantization: crate::dsl::DenseVectorQuantization::F32,
2175 num_clusters: None,
2176 nprobe: 0,
2177 build_threshold: None,
2178 unit_norm: false,
2179 },
2180 );
2181 let schema = sb.build();
2182
2183 let dir = RamDirectory::new();
2184 let config = IndexConfig::default();
2185 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
2186 .await
2187 .unwrap();
2188
2189 for i in 0..80u32 {
2191 let mut doc = Document::new();
2192 doc.add_text(title, format!("Hay doc {}", i));
2193 doc.add_text(body, "general filler text about nothing special");
2194 doc.add_sparse_vector(sparse, vec![(0, 0.3), (1, 0.2), ((i % 10) + 10, 0.5)]);
2195 let vec: Vec<f32> = (0..dim)
2196 .map(|d| ((i as usize * 3 + d * 7) % 50) as f32 / 100.0)
2197 .collect();
2198 doc.add_dense_vector(embedding, vec);
2199 writer.add_document(doc).unwrap();
2200 }
2201
2202 let mut needle = Document::new();
2204 needle.add_text(title, "The extraordinary rhinoceros");
2205 needle.add_text(
2206 body,
2207 "This document about rhinoceros is the only one with this word",
2208 );
2209 needle.add_sparse_vector(sparse, vec![(9999, 0.99), (9998, 0.88)]);
2210 let needle_vec = vec![0.9; dim];
2211 needle.add_dense_vector(embedding, needle_vec.clone());
2212 writer.add_document(needle).unwrap();
2213
2214 writer.commit().await.unwrap();
2215
2216 let index = Index::open(dir, config).await.unwrap();
2217 assert_eq!(index.num_docs().await.unwrap(), 81);
2218
2219 let reader = index.reader().await.unwrap();
2220 let searcher = reader.searcher().await.unwrap();
2221
2222 let tq = TermQuery::text(body, "rhinoceros");
2224 let results = searcher.search(&tq, 10).await.unwrap();
2225 assert_eq!(
2226 results.len(),
2227 1,
2228 "Full-text: should find exactly the needle"
2229 );
2230 let doc = searcher.doc(results[0].doc_id).await.unwrap().unwrap();
2231 assert!(
2232 doc.get_first(title)
2233 .unwrap()
2234 .as_text()
2235 .unwrap()
2236 .contains("rhinoceros")
2237 );
2238
2239 let sq = SparseVectorQuery::new(sparse, vec![(9999, 1.0), (9998, 1.0)]);
2241 let results = searcher.search(&sq, 10).await.unwrap();
2242 assert_eq!(results.len(), 1, "Sparse: should find exactly the needle");
2243 let doc = searcher.doc(results[0].doc_id).await.unwrap().unwrap();
2244 assert!(
2245 doc.get_first(title)
2246 .unwrap()
2247 .as_text()
2248 .unwrap()
2249 .contains("rhinoceros")
2250 );
2251
2252 let dq = DenseVectorQuery::new(embedding, needle_vec);
2254 let results = searcher.search(&dq, 1).await.unwrap();
2255 assert!(!results.is_empty(), "Dense: should find at least 1 result");
2256 let doc = searcher.doc(results[0].doc_id).await.unwrap().unwrap();
2257 assert_eq!(
2258 doc.get_first(title).unwrap().as_text().unwrap(),
2259 "The extraordinary rhinoceros",
2260 "Dense: top-1 should be the needle"
2261 );
2262
2263 let ft_doc_id = {
2265 let tq = TermQuery::text(body, "rhinoceros");
2266 let r = searcher.search(&tq, 1).await.unwrap();
2267 r[0].doc_id
2268 };
2269 let sp_doc_id = {
2270 let sq = SparseVectorQuery::new(sparse, vec![(9999, 1.0)]);
2271 let r = searcher.search(&sq, 1).await.unwrap();
2272 r[0].doc_id
2273 };
2274 let dn_doc_id = {
2275 let dq = DenseVectorQuery::new(embedding, vec![0.9; dim]);
2276 let r = searcher.search(&dq, 1).await.unwrap();
2277 r[0].doc_id
2278 };
2279
2280 assert_eq!(
2281 ft_doc_id, sp_doc_id,
2282 "Full-text and sparse should find same doc"
2283 );
2284 assert_eq!(
2285 sp_doc_id, dn_doc_id,
2286 "Sparse and dense should find same doc"
2287 );
2288 }
2289
2290 #[tokio::test]
2292 async fn test_many_needles_all_found() {
2293 let mut sb = SchemaBuilder::default();
2294 let content = sb.add_text_field("content", true, true);
2295 let schema = sb.build();
2296
2297 let dir = RamDirectory::new();
2298 let config = IndexConfig::default();
2299 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
2300 .await
2301 .unwrap();
2302
2303 let num_needles = 20usize;
2304 let hay_per_batch = 50usize;
2305 let needle_terms: Vec<String> = (0..num_needles)
2306 .map(|i| format!("uniqueneedle{:04}", i))
2307 .collect();
2308
2309 for batch in 0..4 {
2311 for i in 0..hay_per_batch {
2313 let mut doc = Document::new();
2314 doc.add_text(
2315 content,
2316 format!("hay batch {} item {} common filler", batch, i),
2317 );
2318 writer.add_document(doc).unwrap();
2319 }
2320 for n in 0..5 {
2322 let needle_idx = batch * 5 + n;
2323 let mut doc = Document::new();
2324 doc.add_text(
2325 content,
2326 format!("this is {} among many documents", needle_terms[needle_idx]),
2327 );
2328 writer.add_document(doc).unwrap();
2329 }
2330 writer.commit().await.unwrap();
2331 }
2332
2333 let index = Index::open(dir, config).await.unwrap();
2334 let total = index.num_docs().await.unwrap();
2335 assert_eq!(total, (hay_per_batch * 4 + num_needles) as u32);
2336
2337 for term in &needle_terms {
2339 let results = index.query(term, 10).await.unwrap();
2340 assert_eq!(
2341 results.hits.len(),
2342 1,
2343 "Should find exactly 1 doc for needle '{}'",
2344 term
2345 );
2346 }
2347
2348 let results = index.query("common", 500).await.unwrap();
2350 assert_eq!(
2351 results.hits.len(),
2352 hay_per_batch * 4,
2353 "Common term should match all {} hay docs",
2354 hay_per_batch * 4
2355 );
2356 }
2357}