1#[cfg(feature = "native")]
11use crate::dsl::Schema;
12#[cfg(feature = "native")]
13use crate::error::Result;
14#[cfg(feature = "native")]
15use std::sync::Arc;
16
17mod searcher;
18pub use searcher::Searcher;
19
20#[cfg(feature = "native")]
21mod reader;
22#[cfg(feature = "native")]
23mod vector_builder;
24#[cfg(feature = "native")]
25mod writer;
26#[cfg(feature = "native")]
27pub use reader::IndexReader;
28#[cfg(feature = "native")]
29pub use writer::{IndexWriter, PreparedCommit};
30
31mod metadata;
32pub use metadata::{FieldVectorMeta, INDEX_META_FILENAME, IndexMetadata, VectorIndexState};
33
34#[cfg(feature = "native")]
35mod helpers;
36#[cfg(feature = "native")]
37pub use helpers::{
38 IndexingStats, SchemaConfig, SchemaFieldConfig, create_index_at_path, create_index_from_sdl,
39 index_documents_from_reader, index_json_document, parse_schema,
40};
41
42pub const SLICE_CACHE_FILENAME: &str = "index.slicecache";
44
45#[derive(Debug, Clone)]
47pub struct IndexConfig {
48 pub num_threads: usize,
50 pub num_indexing_threads: usize,
52 pub num_compression_threads: usize,
54 pub term_cache_blocks: usize,
56 pub store_cache_blocks: usize,
58 pub max_indexing_memory_bytes: usize,
60 pub merge_policy: Box<dyn crate::merge::MergePolicy>,
62 pub optimization: crate::structures::IndexOptimization,
64 pub reload_interval_ms: u64,
66 pub max_concurrent_merges: usize,
68}
69
70impl Default for IndexConfig {
71 fn default() -> Self {
72 #[cfg(feature = "native")]
73 let indexing_threads = crate::default_indexing_threads();
74 #[cfg(not(feature = "native"))]
75 let indexing_threads = 1;
76
77 #[cfg(feature = "native")]
78 let compression_threads = crate::default_compression_threads();
79 #[cfg(not(feature = "native"))]
80 let compression_threads = 1;
81
82 Self {
83 num_threads: indexing_threads,
84 num_indexing_threads: 1, num_compression_threads: compression_threads,
86 term_cache_blocks: 256,
87 store_cache_blocks: 32,
88 max_indexing_memory_bytes: 256 * 1024 * 1024, merge_policy: Box::new(crate::merge::TieredMergePolicy::default()),
90 optimization: crate::structures::IndexOptimization::default(),
91 reload_interval_ms: 1000, max_concurrent_merges: 4,
93 }
94 }
95}
96
97#[cfg(feature = "native")]
106pub struct Index<D: crate::directories::DirectoryWriter + 'static> {
107 directory: Arc<D>,
108 schema: Arc<Schema>,
109 config: IndexConfig,
110 segment_manager: Arc<crate::merge::SegmentManager<D>>,
112 cached_reader: tokio::sync::OnceCell<IndexReader<D>>,
114}
115
116#[cfg(feature = "native")]
117impl<D: crate::directories::DirectoryWriter + 'static> Index<D> {
118 pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
120 let directory = Arc::new(directory);
121 let schema = Arc::new(schema);
122 let metadata = IndexMetadata::new((*schema).clone());
123
124 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
125 Arc::clone(&directory),
126 Arc::clone(&schema),
127 metadata,
128 config.merge_policy.clone_box(),
129 config.term_cache_blocks,
130 config.max_concurrent_merges,
131 ));
132
133 segment_manager.update_metadata(|_| {}).await?;
135
136 Ok(Self {
137 directory,
138 schema,
139 config,
140 segment_manager,
141 cached_reader: tokio::sync::OnceCell::new(),
142 })
143 }
144
145 pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
147 let directory = Arc::new(directory);
148
149 let metadata = IndexMetadata::load(directory.as_ref()).await?;
151 let schema = Arc::new(metadata.schema.clone());
152
153 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
154 Arc::clone(&directory),
155 Arc::clone(&schema),
156 metadata,
157 config.merge_policy.clone_box(),
158 config.term_cache_blocks,
159 config.max_concurrent_merges,
160 ));
161
162 segment_manager.load_and_publish_trained().await;
164
165 Ok(Self {
166 directory,
167 schema,
168 config,
169 segment_manager,
170 cached_reader: tokio::sync::OnceCell::new(),
171 })
172 }
173
174 pub fn schema(&self) -> &Schema {
176 &self.schema
177 }
178
179 pub fn schema_arc(&self) -> &Arc<Schema> {
181 &self.schema
182 }
183
184 pub fn directory(&self) -> &D {
186 &self.directory
187 }
188
189 pub fn segment_manager(&self) -> &Arc<crate::merge::SegmentManager<D>> {
191 &self.segment_manager
192 }
193
194 pub async fn reader(&self) -> Result<&IndexReader<D>> {
199 self.cached_reader
200 .get_or_try_init(|| async {
201 IndexReader::from_segment_manager(
202 Arc::clone(&self.schema),
203 Arc::clone(&self.segment_manager),
204 self.config.term_cache_blocks,
205 self.config.reload_interval_ms,
206 )
207 .await
208 })
209 .await
210 }
211
212 pub fn config(&self) -> &IndexConfig {
214 &self.config
215 }
216
217 pub async fn segment_readers(&self) -> Result<Vec<Arc<crate::segment::SegmentReader>>> {
219 let reader = self.reader().await?;
220 let searcher = reader.searcher().await?;
221 Ok(searcher.segment_readers().to_vec())
222 }
223
224 pub async fn num_docs(&self) -> Result<u32> {
226 let reader = self.reader().await?;
227 let searcher = reader.searcher().await?;
228 Ok(searcher.num_docs())
229 }
230
231 pub fn default_fields(&self) -> Vec<crate::Field> {
233 if !self.schema.default_fields().is_empty() {
234 self.schema.default_fields().to_vec()
235 } else {
236 self.schema
237 .fields()
238 .filter(|(_, entry)| {
239 entry.indexed && entry.field_type == crate::dsl::FieldType::Text
240 })
241 .map(|(field, _)| field)
242 .collect()
243 }
244 }
245
246 pub fn tokenizers(&self) -> Arc<crate::tokenizer::TokenizerRegistry> {
248 Arc::new(crate::tokenizer::TokenizerRegistry::default())
249 }
250
251 pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
253 let default_fields = self.default_fields();
254 let tokenizers = self.tokenizers();
255
256 let query_routers = self.schema.query_routers();
257 if !query_routers.is_empty()
258 && let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers)
259 {
260 return crate::dsl::QueryLanguageParser::with_router(
261 Arc::clone(&self.schema),
262 default_fields,
263 tokenizers,
264 router,
265 );
266 }
267
268 crate::dsl::QueryLanguageParser::new(Arc::clone(&self.schema), default_fields, tokenizers)
269 }
270
271 pub async fn query(
273 &self,
274 query_str: &str,
275 limit: usize,
276 ) -> Result<crate::query::SearchResponse> {
277 self.query_offset(query_str, limit, 0).await
278 }
279
280 pub async fn query_offset(
282 &self,
283 query_str: &str,
284 limit: usize,
285 offset: usize,
286 ) -> Result<crate::query::SearchResponse> {
287 let parser = self.query_parser();
288 let query = parser
289 .parse(query_str)
290 .map_err(crate::error::Error::Query)?;
291 self.search_offset(query.as_ref(), limit, offset).await
292 }
293
294 pub async fn search(
296 &self,
297 query: &dyn crate::query::Query,
298 limit: usize,
299 ) -> Result<crate::query::SearchResponse> {
300 self.search_offset(query, limit, 0).await
301 }
302
303 pub async fn search_offset(
305 &self,
306 query: &dyn crate::query::Query,
307 limit: usize,
308 offset: usize,
309 ) -> Result<crate::query::SearchResponse> {
310 let reader = self.reader().await?;
311 let searcher = reader.searcher().await?;
312 let segments = searcher.segment_readers();
313
314 let fetch_limit = offset + limit;
315
316 let futures: Vec<_> = segments
317 .iter()
318 .map(|segment| {
319 let sid = segment.meta().id;
320 async move {
321 let results =
322 crate::query::search_segment(segment.as_ref(), query, fetch_limit).await?;
323 Ok::<_, crate::error::Error>(
324 results
325 .into_iter()
326 .map(move |r| (sid, r))
327 .collect::<Vec<_>>(),
328 )
329 }
330 })
331 .collect();
332
333 let batches = futures::future::try_join_all(futures).await?;
334 let mut all_results: Vec<(u128, crate::query::SearchResult)> =
335 Vec::with_capacity(batches.iter().map(|b| b.len()).sum());
336 for batch in batches {
337 all_results.extend(batch);
338 }
339
340 all_results.sort_by(|a, b| {
341 b.1.score
342 .partial_cmp(&a.1.score)
343 .unwrap_or(std::cmp::Ordering::Equal)
344 });
345
346 let total_hits = all_results.len() as u32;
347
348 let hits: Vec<crate::query::SearchHit> = all_results
349 .into_iter()
350 .skip(offset)
351 .take(limit)
352 .map(|(segment_id, result)| crate::query::SearchHit {
353 address: crate::query::DocAddress::new(segment_id, result.doc_id),
354 score: result.score,
355 matched_fields: result.extract_ordinals(),
356 })
357 .collect();
358
359 Ok(crate::query::SearchResponse { hits, total_hits })
360 }
361
362 pub async fn get_document(
364 &self,
365 address: &crate::query::DocAddress,
366 ) -> Result<Option<crate::dsl::Document>> {
367 let reader = self.reader().await?;
368 let searcher = reader.searcher().await?;
369 searcher.get_document(address).await
370 }
371
372 pub async fn get_postings(
374 &self,
375 field: crate::Field,
376 term: &[u8],
377 ) -> Result<
378 Vec<(
379 Arc<crate::segment::SegmentReader>,
380 crate::structures::BlockPostingList,
381 )>,
382 > {
383 let segments = self.segment_readers().await?;
384 let mut results = Vec::new();
385
386 for segment in segments {
387 if let Some(postings) = segment.get_postings(field, term).await? {
388 results.push((segment, postings));
389 }
390 }
391
392 Ok(results)
393 }
394}
395
396#[cfg(feature = "native")]
398impl<D: crate::directories::DirectoryWriter + 'static> Index<D> {
399 pub fn writer(&self) -> writer::IndexWriter<D> {
401 writer::IndexWriter::from_index(self)
402 }
403}
404
405#[cfg(test)]
406mod tests {
407 use super::*;
408 use crate::directories::RamDirectory;
409 use crate::dsl::{Document, SchemaBuilder};
410
411 #[tokio::test]
412 async fn test_index_create_and_search() {
413 let mut schema_builder = SchemaBuilder::default();
414 let title = schema_builder.add_text_field("title", true, true);
415 let body = schema_builder.add_text_field("body", true, true);
416 let schema = schema_builder.build();
417
418 let dir = RamDirectory::new();
419 let config = IndexConfig::default();
420
421 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
423 .await
424 .unwrap();
425
426 let mut doc1 = Document::new();
427 doc1.add_text(title, "Hello World");
428 doc1.add_text(body, "This is the first document");
429 writer.add_document(doc1).unwrap();
430
431 let mut doc2 = Document::new();
432 doc2.add_text(title, "Goodbye World");
433 doc2.add_text(body, "This is the second document");
434 writer.add_document(doc2).unwrap();
435
436 writer.commit().await.unwrap();
437
438 let index = Index::open(dir, config).await.unwrap();
440 assert_eq!(index.num_docs().await.unwrap(), 2);
441
442 let postings = index.get_postings(title, b"world").await.unwrap();
444 assert_eq!(postings.len(), 1); assert_eq!(postings[0].1.doc_count(), 2); let reader = index.reader().await.unwrap();
449 let searcher = reader.searcher().await.unwrap();
450 let doc = searcher.doc(0).await.unwrap().unwrap();
451 assert_eq!(doc.get_first(title).unwrap().as_text(), Some("Hello World"));
452 }
453
454 #[tokio::test]
455 async fn test_multiple_segments() {
456 let mut schema_builder = SchemaBuilder::default();
457 let title = schema_builder.add_text_field("title", true, true);
458 let schema = schema_builder.build();
459
460 let dir = RamDirectory::new();
461 let config = IndexConfig {
462 max_indexing_memory_bytes: 1024, ..Default::default()
464 };
465
466 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
467 .await
468 .unwrap();
469
470 for batch in 0..3 {
472 for i in 0..5 {
473 let mut doc = Document::new();
474 doc.add_text(title, format!("Document {} batch {}", i, batch));
475 writer.add_document(doc).unwrap();
476 }
477 writer.commit().await.unwrap();
478 }
479
480 let index = Index::open(dir, config).await.unwrap();
482 assert_eq!(index.num_docs().await.unwrap(), 15);
483 assert!(
485 index.segment_readers().await.unwrap().len() >= 2,
486 "Expected multiple segments"
487 );
488 }
489
490 #[tokio::test]
491 async fn test_segment_merge() {
492 let mut schema_builder = SchemaBuilder::default();
493 let title = schema_builder.add_text_field("title", true, true);
494 let schema = schema_builder.build();
495
496 let dir = RamDirectory::new();
497 let config = IndexConfig {
498 max_indexing_memory_bytes: 512, ..Default::default()
500 };
501
502 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
503 .await
504 .unwrap();
505
506 for batch in 0..3 {
508 for i in 0..3 {
509 let mut doc = Document::new();
510 doc.add_text(title, format!("Document {} batch {}", i, batch));
511 writer.add_document(doc).unwrap();
512 }
513 writer.commit().await.unwrap();
514 }
515
516 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
518 assert!(
519 index.segment_readers().await.unwrap().len() >= 2,
520 "Expected multiple segments"
521 );
522
523 let mut writer = IndexWriter::open(dir.clone(), config.clone())
525 .await
526 .unwrap();
527 writer.force_merge().await.unwrap();
528
529 let index = Index::open(dir, config).await.unwrap();
531 assert_eq!(index.segment_readers().await.unwrap().len(), 1);
532 assert_eq!(index.num_docs().await.unwrap(), 9);
533
534 let reader = index.reader().await.unwrap();
536 let searcher = reader.searcher().await.unwrap();
537 let mut found_docs = 0;
538 for i in 0..9 {
539 if searcher.doc(i).await.unwrap().is_some() {
540 found_docs += 1;
541 }
542 }
543 assert_eq!(found_docs, 9);
544 }
545
546 #[tokio::test]
547 async fn test_match_query() {
548 let mut schema_builder = SchemaBuilder::default();
549 let title = schema_builder.add_text_field("title", true, true);
550 let body = schema_builder.add_text_field("body", true, true);
551 let schema = schema_builder.build();
552
553 let dir = RamDirectory::new();
554 let config = IndexConfig::default();
555
556 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
557 .await
558 .unwrap();
559
560 let mut doc1 = Document::new();
561 doc1.add_text(title, "rust programming");
562 doc1.add_text(body, "Learn rust language");
563 writer.add_document(doc1).unwrap();
564
565 let mut doc2 = Document::new();
566 doc2.add_text(title, "python programming");
567 doc2.add_text(body, "Learn python language");
568 writer.add_document(doc2).unwrap();
569
570 writer.commit().await.unwrap();
571
572 let index = Index::open(dir, config).await.unwrap();
573
574 let results = index.query("rust", 10).await.unwrap();
576 assert_eq!(results.hits.len(), 1);
577
578 let results = index.query("rust programming", 10).await.unwrap();
580 assert!(!results.hits.is_empty());
581
582 let hit = &results.hits[0];
584 assert!(!hit.address.segment_id.is_empty(), "Should have segment_id");
585
586 let doc = index.get_document(&hit.address).await.unwrap().unwrap();
588 assert!(
589 !doc.field_values().is_empty(),
590 "Doc should have field values"
591 );
592
593 let reader = index.reader().await.unwrap();
595 let searcher = reader.searcher().await.unwrap();
596 let doc = searcher.doc(0).await.unwrap().unwrap();
597 assert!(
598 !doc.field_values().is_empty(),
599 "Doc should have field values"
600 );
601 }
602
603 #[tokio::test]
604 async fn test_slice_cache_warmup_and_load() {
605 use crate::directories::SliceCachingDirectory;
606
607 let mut schema_builder = SchemaBuilder::default();
608 let title = schema_builder.add_text_field("title", true, true);
609 let body = schema_builder.add_text_field("body", true, true);
610 let schema = schema_builder.build();
611
612 let dir = RamDirectory::new();
613 let config = IndexConfig::default();
614
615 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
617 .await
618 .unwrap();
619
620 for i in 0..10 {
621 let mut doc = Document::new();
622 doc.add_text(title, format!("Document {} about rust", i));
623 doc.add_text(body, format!("This is body text number {}", i));
624 writer.add_document(doc).unwrap();
625 }
626 writer.commit().await.unwrap();
627
628 let caching_dir = SliceCachingDirectory::new(dir.clone(), 1024 * 1024);
630 let index = Index::open(caching_dir, config.clone()).await.unwrap();
631
632 let results = index.query("rust", 10).await.unwrap();
634 assert!(!results.hits.is_empty());
635
636 let stats = index.directory.stats();
638 assert!(stats.total_bytes > 0, "Cache should have data after search");
639 }
640
641 #[tokio::test]
642 async fn test_multivalue_field_indexing_and_search() {
643 let mut schema_builder = SchemaBuilder::default();
644 let uris = schema_builder.add_text_field("uris", true, true);
645 let title = schema_builder.add_text_field("title", true, true);
646 let schema = schema_builder.build();
647
648 let dir = RamDirectory::new();
649 let config = IndexConfig::default();
650
651 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
653 .await
654 .unwrap();
655
656 let mut doc = Document::new();
657 doc.add_text(uris, "one");
658 doc.add_text(uris, "two");
659 doc.add_text(title, "Test Document");
660 writer.add_document(doc).unwrap();
661
662 let mut doc2 = Document::new();
664 doc2.add_text(uris, "three");
665 doc2.add_text(title, "Another Document");
666 writer.add_document(doc2).unwrap();
667
668 writer.commit().await.unwrap();
669
670 let index = Index::open(dir, config).await.unwrap();
672 assert_eq!(index.num_docs().await.unwrap(), 2);
673
674 let reader = index.reader().await.unwrap();
676 let searcher = reader.searcher().await.unwrap();
677 let doc = searcher.doc(0).await.unwrap().unwrap();
678 let all_uris: Vec<_> = doc.get_all(uris).collect();
679 assert_eq!(all_uris.len(), 2, "Should have 2 uris values");
680 assert_eq!(all_uris[0].as_text(), Some("one"));
681 assert_eq!(all_uris[1].as_text(), Some("two"));
682
683 let json = doc.to_json(index.schema());
685 let uris_json = json.get("uris").unwrap();
686 assert!(uris_json.is_array(), "Multi-value field should be an array");
687 let uris_arr = uris_json.as_array().unwrap();
688 assert_eq!(uris_arr.len(), 2);
689 assert_eq!(uris_arr[0].as_str(), Some("one"));
690 assert_eq!(uris_arr[1].as_str(), Some("two"));
691
692 let results = index.query("uris:one", 10).await.unwrap();
694 assert_eq!(results.hits.len(), 1, "Should find doc with 'one'");
695 assert_eq!(results.hits[0].address.doc_id, 0);
696
697 let results = index.query("uris:two", 10).await.unwrap();
698 assert_eq!(results.hits.len(), 1, "Should find doc with 'two'");
699 assert_eq!(results.hits[0].address.doc_id, 0);
700
701 let results = index.query("uris:three", 10).await.unwrap();
702 assert_eq!(results.hits.len(), 1, "Should find doc with 'three'");
703 assert_eq!(results.hits[0].address.doc_id, 1);
704
705 let results = index.query("uris:nonexistent", 10).await.unwrap();
707 assert_eq!(results.hits.len(), 0, "Should not find non-existent value");
708 }
709
710 #[tokio::test]
717 async fn test_wand_optimization_for_or_queries() {
718 use crate::query::{BooleanQuery, TermQuery};
719
720 let mut schema_builder = SchemaBuilder::default();
721 let content = schema_builder.add_text_field("content", true, true);
722 let schema = schema_builder.build();
723
724 let dir = RamDirectory::new();
725 let config = IndexConfig::default();
726
727 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
729 .await
730 .unwrap();
731
732 let mut doc = Document::new();
734 doc.add_text(content, "rust programming language is fast");
735 writer.add_document(doc).unwrap();
736
737 let mut doc = Document::new();
739 doc.add_text(content, "rust is a systems language");
740 writer.add_document(doc).unwrap();
741
742 let mut doc = Document::new();
744 doc.add_text(content, "programming is fun");
745 writer.add_document(doc).unwrap();
746
747 let mut doc = Document::new();
749 doc.add_text(content, "python is easy to learn");
750 writer.add_document(doc).unwrap();
751
752 let mut doc = Document::new();
754 doc.add_text(content, "rust rust programming programming systems");
755 writer.add_document(doc).unwrap();
756
757 writer.commit().await.unwrap();
758
759 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
761
762 let or_query = BooleanQuery::new()
764 .should(TermQuery::text(content, "rust"))
765 .should(TermQuery::text(content, "programming"));
766
767 let results = index.search(&or_query, 10).await.unwrap();
768
769 assert_eq!(results.hits.len(), 4, "Should find exactly 4 documents");
771
772 let doc_ids: Vec<u32> = results.hits.iter().map(|h| h.address.doc_id).collect();
773 assert!(doc_ids.contains(&0), "Should find doc 0");
774 assert!(doc_ids.contains(&1), "Should find doc 1");
775 assert!(doc_ids.contains(&2), "Should find doc 2");
776 assert!(doc_ids.contains(&4), "Should find doc 4");
777 assert!(
778 !doc_ids.contains(&3),
779 "Should NOT find doc 3 (only has 'python')"
780 );
781
782 let single_query = BooleanQuery::new().should(TermQuery::text(content, "rust"));
784
785 let results = index.search(&single_query, 10).await.unwrap();
786 assert_eq!(results.hits.len(), 3, "Should find 3 documents with 'rust'");
787
788 let must_query = BooleanQuery::new()
790 .must(TermQuery::text(content, "rust"))
791 .should(TermQuery::text(content, "programming"));
792
793 let results = index.search(&must_query, 10).await.unwrap();
794 assert_eq!(results.hits.len(), 3, "Should find 3 documents with 'rust'");
796
797 let must_not_query = BooleanQuery::new()
799 .should(TermQuery::text(content, "rust"))
800 .should(TermQuery::text(content, "programming"))
801 .must_not(TermQuery::text(content, "systems"));
802
803 let results = index.search(&must_not_query, 10).await.unwrap();
804 let doc_ids: Vec<u32> = results.hits.iter().map(|h| h.address.doc_id).collect();
806 assert!(
807 !doc_ids.contains(&1),
808 "Should NOT find doc 1 (has 'systems')"
809 );
810 assert!(
811 !doc_ids.contains(&4),
812 "Should NOT find doc 4 (has 'systems')"
813 );
814
815 let or_query = BooleanQuery::new()
817 .should(TermQuery::text(content, "rust"))
818 .should(TermQuery::text(content, "programming"));
819
820 let results = index.search(&or_query, 2).await.unwrap();
821 assert_eq!(results.hits.len(), 2, "Should return only top 2 results");
822
823 }
826
827 #[tokio::test]
829 async fn test_wand_results_match_standard_boolean() {
830 use crate::query::{BooleanQuery, TermQuery, WandOrQuery};
831
832 let mut schema_builder = SchemaBuilder::default();
833 let content = schema_builder.add_text_field("content", true, true);
834 let schema = schema_builder.build();
835
836 let dir = RamDirectory::new();
837 let config = IndexConfig::default();
838
839 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
840 .await
841 .unwrap();
842
843 for i in 0..10 {
845 let mut doc = Document::new();
846 let text = match i % 4 {
847 0 => "apple banana cherry",
848 1 => "apple orange",
849 2 => "banana grape",
850 _ => "cherry date",
851 };
852 doc.add_text(content, text);
853 writer.add_document(doc).unwrap();
854 }
855
856 writer.commit().await.unwrap();
857 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
858
859 let wand_query = WandOrQuery::new(content).term("apple").term("banana");
861
862 let bool_query = BooleanQuery::new()
863 .should(TermQuery::text(content, "apple"))
864 .should(TermQuery::text(content, "banana"));
865
866 let wand_results = index.search(&wand_query, 10).await.unwrap();
867 let bool_results = index.search(&bool_query, 10).await.unwrap();
868
869 assert_eq!(
871 wand_results.hits.len(),
872 bool_results.hits.len(),
873 "WAND and Boolean should find same number of docs"
874 );
875
876 let wand_docs: std::collections::HashSet<u32> =
877 wand_results.hits.iter().map(|h| h.address.doc_id).collect();
878 let bool_docs: std::collections::HashSet<u32> =
879 bool_results.hits.iter().map(|h| h.address.doc_id).collect();
880
881 assert_eq!(
882 wand_docs, bool_docs,
883 "WAND and Boolean should find same documents"
884 );
885 }
886
887 #[tokio::test]
888 async fn test_vector_index_threshold_switch() {
889 use crate::dsl::{DenseVectorConfig, DenseVectorQuantization, VectorIndexType};
890
891 let mut schema_builder = SchemaBuilder::default();
893 let title = schema_builder.add_text_field("title", true, true);
894 let embedding = schema_builder.add_dense_vector_field_with_config(
895 "embedding",
896 true, true, DenseVectorConfig {
899 dim: 8,
900 index_type: VectorIndexType::IvfRaBitQ,
901 quantization: DenseVectorQuantization::F32,
902 num_clusters: Some(4), nprobe: 2,
904 build_threshold: Some(50), },
906 );
907 let schema = schema_builder.build();
908
909 let dir = RamDirectory::new();
910 let config = IndexConfig::default();
911
912 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
914 .await
915 .unwrap();
916
917 for i in 0..30 {
919 let mut doc = Document::new();
920 doc.add_text(title, format!("Document {}", i));
921 let vec: Vec<f32> = (0..8).map(|_| (i as f32) / 30.0).collect();
923 doc.add_dense_vector(embedding, vec);
924 writer.add_document(doc).unwrap();
925 }
926 writer.commit().await.unwrap();
927
928 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
930 assert!(
931 index.segment_manager.trained().is_none(),
932 "Should not have trained centroids below threshold"
933 );
934
935 let query_vec: Vec<f32> = vec![0.5; 8];
937 let segments = index.segment_readers().await.unwrap();
938 assert!(!segments.is_empty());
939
940 let results = segments[0]
941 .search_dense_vector(
942 embedding,
943 &query_vec,
944 5,
945 0,
946 1,
947 crate::query::MultiValueCombiner::Max,
948 )
949 .await
950 .unwrap();
951 assert!(!results.is_empty(), "Flat search should return results");
952
953 let mut writer = IndexWriter::open(dir.clone(), config.clone())
955 .await
956 .unwrap();
957
958 for i in 30..60 {
960 let mut doc = Document::new();
961 doc.add_text(title, format!("Document {}", i));
962 let vec: Vec<f32> = (0..8).map(|_| (i as f32) / 60.0).collect();
963 doc.add_dense_vector(embedding, vec);
964 writer.add_document(doc).unwrap();
965 }
966 writer.commit().await.unwrap();
967
968 writer.build_vector_index().await.unwrap();
970
971 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
973 assert!(
974 index.segment_manager.trained().is_some(),
975 "Should have loaded trained centroids for embedding field"
976 );
977
978 let segments = index.segment_readers().await.unwrap();
980 let results = segments[0]
981 .search_dense_vector(
982 embedding,
983 &query_vec,
984 5,
985 0,
986 1,
987 crate::query::MultiValueCombiner::Max,
988 )
989 .await
990 .unwrap();
991 assert!(
992 !results.is_empty(),
993 "Search should return results after build"
994 );
995
996 let writer = IndexWriter::open(dir.clone(), config.clone())
998 .await
999 .unwrap();
1000 writer.build_vector_index().await.unwrap(); assert!(writer.segment_manager.trained().is_some());
1004 }
1005
1006 #[tokio::test]
1009 async fn test_multi_round_merge_with_search() {
1010 let mut schema_builder = SchemaBuilder::default();
1011 let title = schema_builder.add_text_field("title", true, true);
1012 let body = schema_builder.add_text_field("body", true, true);
1013 let schema = schema_builder.build();
1014
1015 let dir = RamDirectory::new();
1016 let config = IndexConfig {
1017 max_indexing_memory_bytes: 512,
1018 ..Default::default()
1019 };
1020
1021 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1023 .await
1024 .unwrap();
1025
1026 for batch in 0..5 {
1027 for i in 0..10 {
1028 let mut doc = Document::new();
1029 doc.add_text(
1030 title,
1031 format!("alpha bravo charlie batch{} doc{}", batch, i),
1032 );
1033 doc.add_text(
1034 body,
1035 format!("the quick brown fox jumps over the lazy dog number {}", i),
1036 );
1037 writer.add_document(doc).unwrap();
1038 }
1039 writer.commit().await.unwrap();
1040 }
1041
1042 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1043 let pre_merge_segments = index.segment_readers().await.unwrap().len();
1044 assert!(
1045 pre_merge_segments >= 3,
1046 "Expected >=3 segments, got {}",
1047 pre_merge_segments
1048 );
1049 assert_eq!(index.num_docs().await.unwrap(), 50);
1050
1051 let results = index.query("alpha", 100).await.unwrap();
1053 assert_eq!(results.hits.len(), 50, "all 50 docs should match 'alpha'");
1054
1055 let results = index.query("fox", 100).await.unwrap();
1056 assert_eq!(results.hits.len(), 50, "all 50 docs should match 'fox'");
1057
1058 let mut writer = IndexWriter::open(dir.clone(), config.clone())
1060 .await
1061 .unwrap();
1062 writer.force_merge().await.unwrap();
1063
1064 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1065 assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1066 assert_eq!(index.num_docs().await.unwrap(), 50);
1067
1068 let results = index.query("alpha", 100).await.unwrap();
1070 assert_eq!(
1071 results.hits.len(),
1072 50,
1073 "all 50 docs should match 'alpha' after merge 1"
1074 );
1075
1076 let results = index.query("fox", 100).await.unwrap();
1077 assert_eq!(
1078 results.hits.len(),
1079 50,
1080 "all 50 docs should match 'fox' after merge 1"
1081 );
1082
1083 let reader1 = index.reader().await.unwrap();
1085 let searcher1 = reader1.searcher().await.unwrap();
1086 for i in 0..50 {
1087 let doc = searcher1.doc(i).await.unwrap();
1088 assert!(doc.is_some(), "doc {} should exist after merge 1", i);
1089 }
1090
1091 let mut writer = IndexWriter::open(dir.clone(), config.clone())
1093 .await
1094 .unwrap();
1095 for batch in 0..3 {
1096 for i in 0..10 {
1097 let mut doc = Document::new();
1098 doc.add_text(
1099 title,
1100 format!("delta echo foxtrot round2_batch{} doc{}", batch, i),
1101 );
1102 doc.add_text(
1103 body,
1104 format!("the quick brown fox jumps again number {}", i),
1105 );
1106 writer.add_document(doc).unwrap();
1107 }
1108 writer.commit().await.unwrap();
1109 }
1110
1111 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1112 assert_eq!(index.num_docs().await.unwrap(), 80);
1113 assert!(
1114 index.segment_readers().await.unwrap().len() >= 2,
1115 "Should have >=2 segments after round 2 ingestion"
1116 );
1117
1118 let results = index.query("fox", 100).await.unwrap();
1120 assert_eq!(results.hits.len(), 80, "all 80 docs should match 'fox'");
1121
1122 let results = index.query("alpha", 100).await.unwrap();
1123 assert_eq!(results.hits.len(), 50, "only round 1 docs match 'alpha'");
1124
1125 let results = index.query("delta", 100).await.unwrap();
1126 assert_eq!(results.hits.len(), 30, "only round 2 docs match 'delta'");
1127
1128 let mut writer = IndexWriter::open(dir.clone(), config.clone())
1130 .await
1131 .unwrap();
1132 writer.force_merge().await.unwrap();
1133
1134 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1135 assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1136 assert_eq!(index.num_docs().await.unwrap(), 80);
1137
1138 let results = index.query("fox", 100).await.unwrap();
1140 assert_eq!(results.hits.len(), 80, "all 80 docs after merge 2");
1141
1142 let results = index.query("alpha", 100).await.unwrap();
1143 assert_eq!(results.hits.len(), 50, "round 1 docs after merge 2");
1144
1145 let results = index.query("delta", 100).await.unwrap();
1146 assert_eq!(results.hits.len(), 30, "round 2 docs after merge 2");
1147
1148 let reader2 = index.reader().await.unwrap();
1150 let searcher2 = reader2.searcher().await.unwrap();
1151 for i in 0..80 {
1152 let doc = searcher2.doc(i).await.unwrap();
1153 assert!(doc.is_some(), "doc {} should exist after merge 2", i);
1154 }
1155 }
1156
1157 #[tokio::test]
1160 async fn test_large_scale_merge_correctness() {
1161 let mut schema_builder = SchemaBuilder::default();
1162 let title = schema_builder.add_text_field("title", true, true);
1163 let schema = schema_builder.build();
1164
1165 let dir = RamDirectory::new();
1166 let config = IndexConfig {
1167 max_indexing_memory_bytes: 512,
1168 ..Default::default()
1169 };
1170
1171 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1172 .await
1173 .unwrap();
1174
1175 let total_docs = 200u32;
1178 for batch in 0..8 {
1179 for i in 0..25 {
1180 let mut doc = Document::new();
1181 doc.add_text(
1182 title,
1183 format!("common shared term unique_{} item{}", batch, i),
1184 );
1185 writer.add_document(doc).unwrap();
1186 }
1187 writer.commit().await.unwrap();
1188 }
1189
1190 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1192 assert_eq!(index.num_docs().await.unwrap(), total_docs);
1193
1194 let results = index.query("common", 300).await.unwrap();
1195 assert_eq!(
1196 results.hits.len(),
1197 total_docs as usize,
1198 "all docs should match 'common'"
1199 );
1200
1201 for batch in 0..8 {
1203 let q = format!("unique_{}", batch);
1204 let results = index.query(&q, 100).await.unwrap();
1205 assert_eq!(results.hits.len(), 25, "'{}' should match 25 docs", q);
1206 }
1207
1208 let mut writer = IndexWriter::open(dir.clone(), config.clone())
1210 .await
1211 .unwrap();
1212 writer.force_merge().await.unwrap();
1213
1214 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1216 assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1217 assert_eq!(index.num_docs().await.unwrap(), total_docs);
1218
1219 let results = index.query("common", 300).await.unwrap();
1220 assert_eq!(results.hits.len(), total_docs as usize);
1221
1222 for batch in 0..8 {
1223 let q = format!("unique_{}", batch);
1224 let results = index.query(&q, 100).await.unwrap();
1225 assert_eq!(results.hits.len(), 25, "'{}' after merge", q);
1226 }
1227
1228 let reader = index.reader().await.unwrap();
1230 let searcher = reader.searcher().await.unwrap();
1231 for i in 0..total_docs {
1232 let doc = searcher.doc(i).await.unwrap();
1233 assert!(doc.is_some(), "doc {} missing after merge", i);
1234 }
1235 }
1236
1237 #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
1241 async fn test_auto_merge_triggered() {
1242 use crate::directories::MmapDirectory;
1243 let tmp_dir = tempfile::tempdir().unwrap();
1244 let dir = MmapDirectory::new(tmp_dir.path());
1245
1246 let mut schema_builder = SchemaBuilder::default();
1247 let title = schema_builder.add_text_field("title", true, true);
1248 let body = schema_builder.add_text_field("body", true, true);
1249 let schema = schema_builder.build();
1250
1251 let config = IndexConfig {
1253 max_indexing_memory_bytes: 4096,
1254 num_indexing_threads: 4,
1255 merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1256 ..Default::default()
1257 };
1258
1259 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1260 .await
1261 .unwrap();
1262
1263 for batch in 0..12 {
1265 for i in 0..50 {
1266 let mut doc = Document::new();
1267 doc.add_text(title, format!("document_{} batch_{} alpha bravo", i, batch));
1268 doc.add_text(
1269 body,
1270 format!(
1271 "the quick brown fox jumps over lazy dog number {} round {}",
1272 i, batch
1273 ),
1274 );
1275 writer.add_document(doc).unwrap();
1276 }
1277 writer.commit().await.unwrap();
1278 }
1279
1280 let pre_merge = writer.segment_manager.get_segment_ids().await.len();
1281
1282 writer.wait_for_merging_thread().await;
1285 writer.maybe_merge().await;
1286 writer.wait_for_merging_thread().await;
1287
1288 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1290 let segment_count = index.segment_readers().await.unwrap().len();
1291 eprintln!(
1292 "Segments: {} before merge, {} after auto-merge",
1293 pre_merge, segment_count
1294 );
1295 assert!(
1296 segment_count < pre_merge,
1297 "Expected auto-merge to reduce segments from {}, got {}",
1298 pre_merge,
1299 segment_count
1300 );
1301 }
1302
1303 #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
1308 async fn test_commit_with_vectors_and_background_merge() {
1309 use crate::directories::MmapDirectory;
1310 use crate::dsl::DenseVectorConfig;
1311
1312 let tmp_dir = tempfile::tempdir().unwrap();
1313 let dir = MmapDirectory::new(tmp_dir.path());
1314
1315 let mut schema_builder = SchemaBuilder::default();
1316 let title = schema_builder.add_text_field("title", true, true);
1317 let vec_config = DenseVectorConfig::new(8).with_build_threshold(10);
1319 let embedding =
1320 schema_builder.add_dense_vector_field_with_config("embedding", true, true, vec_config);
1321 let schema = schema_builder.build();
1322
1323 let config = IndexConfig {
1325 max_indexing_memory_bytes: 4096,
1326 num_indexing_threads: 4,
1327 merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1328 ..Default::default()
1329 };
1330
1331 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1332 .await
1333 .unwrap();
1334
1335 for batch in 0..12 {
1338 for i in 0..5 {
1339 let mut doc = Document::new();
1340 doc.add_text(title, format!("doc_{}_batch_{}", i, batch));
1341 let vec: Vec<f32> = (0..8).map(|j| (i * 8 + j + batch) as f32 * 0.1).collect();
1343 doc.add_dense_vector(embedding, vec);
1344 writer.add_document(doc).unwrap();
1345 }
1346 writer.commit().await.unwrap();
1347 }
1348 writer.wait_for_merging_thread().await;
1349
1350 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1351 let num_docs = index.num_docs().await.unwrap();
1352 assert_eq!(num_docs, 60, "Expected 60 docs, got {}", num_docs);
1353 }
1354
1355 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1358 async fn test_force_merge_many_segments() {
1359 use crate::directories::MmapDirectory;
1360 let tmp_dir = tempfile::tempdir().unwrap();
1361 let dir = MmapDirectory::new(tmp_dir.path());
1362
1363 let mut schema_builder = SchemaBuilder::default();
1364 let title = schema_builder.add_text_field("title", true, true);
1365 let schema = schema_builder.build();
1366
1367 let config = IndexConfig {
1368 max_indexing_memory_bytes: 512,
1369 ..Default::default()
1370 };
1371
1372 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1373 .await
1374 .unwrap();
1375
1376 for batch in 0..50 {
1378 for i in 0..3 {
1379 let mut doc = Document::new();
1380 doc.add_text(title, format!("term_{} batch_{}", i, batch));
1381 writer.add_document(doc).unwrap();
1382 }
1383 writer.commit().await.unwrap();
1384 }
1385 writer.wait_for_merging_thread().await;
1387
1388 let seg_ids = writer.segment_manager.get_segment_ids().await;
1389 let pre = seg_ids.len();
1390 eprintln!("Segments before force_merge: {}", pre);
1391 assert!(pre >= 2, "Expected multiple segments, got {}", pre);
1392
1393 writer.force_merge().await.unwrap();
1395
1396 let index2 = Index::open(dir, config).await.unwrap();
1397 let post = index2.segment_readers().await.unwrap().len();
1398 eprintln!("Segments after force_merge: {}", post);
1399 assert_eq!(post, 1);
1400 assert_eq!(index2.num_docs().await.unwrap(), 150);
1401 }
1402
1403 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1407 async fn test_background_merge_generation() {
1408 use crate::directories::MmapDirectory;
1409 let tmp_dir = tempfile::tempdir().unwrap();
1410 let dir = MmapDirectory::new(tmp_dir.path());
1411
1412 let mut schema_builder = SchemaBuilder::default();
1413 let title = schema_builder.add_text_field("title", true, true);
1414 let schema = schema_builder.build();
1415
1416 let config = IndexConfig {
1417 max_indexing_memory_bytes: 4096,
1418 num_indexing_threads: 2,
1419 merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1420 ..Default::default()
1421 };
1422
1423 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1424 .await
1425 .unwrap();
1426
1427 for batch in 0..15 {
1429 for i in 0..5 {
1430 let mut doc = Document::new();
1431 doc.add_text(title, format!("doc_{}_batch_{}", i, batch));
1432 writer.add_document(doc).unwrap();
1433 }
1434 writer.commit().await.unwrap();
1435 }
1436 writer.wait_for_merging_thread().await;
1437
1438 let metas = writer
1440 .segment_manager
1441 .read_metadata(|m| m.segment_metas.clone())
1442 .await;
1443
1444 let max_gen = metas.values().map(|m| m.generation).max().unwrap_or(0);
1445 eprintln!(
1446 "Segments after merge: {}, max generation: {}",
1447 metas.len(),
1448 max_gen
1449 );
1450
1451 assert!(
1453 max_gen >= 1,
1454 "Expected at least one merged segment (gen >= 1), got max_gen={}",
1455 max_gen
1456 );
1457
1458 for (id, info) in &metas {
1460 if info.generation > 0 {
1461 assert!(
1462 !info.ancestors.is_empty(),
1463 "Segment {} has gen={} but no ancestors",
1464 id,
1465 info.generation
1466 );
1467 } else {
1468 assert!(
1469 info.ancestors.is_empty(),
1470 "Fresh segment {} has gen=0 but has ancestors",
1471 id
1472 );
1473 }
1474 }
1475 }
1476
1477 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1481 async fn test_merge_preserves_all_documents() {
1482 use crate::directories::MmapDirectory;
1483 let tmp_dir = tempfile::tempdir().unwrap();
1484 let dir = MmapDirectory::new(tmp_dir.path());
1485
1486 let mut schema_builder = SchemaBuilder::default();
1487 let title = schema_builder.add_text_field("title", true, true);
1488 let schema = schema_builder.build();
1489
1490 let config = IndexConfig {
1491 max_indexing_memory_bytes: 4096,
1492 ..Default::default()
1493 };
1494
1495 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1496 .await
1497 .unwrap();
1498
1499 let total_docs = 1200;
1500 let docs_per_batch = 60;
1501 let batches = total_docs / docs_per_batch;
1502
1503 for batch in 0..batches {
1505 for i in 0..docs_per_batch {
1506 let doc_num = batch * docs_per_batch + i;
1507 let mut doc = Document::new();
1508 doc.add_text(
1509 title,
1510 format!("uid_{} common_term batch_{}", doc_num, batch),
1511 );
1512 writer.add_document(doc).unwrap();
1513 }
1514 writer.commit().await.unwrap();
1515 }
1516
1517 let pre_segments = writer.segment_manager.get_segment_ids().await.len();
1518 assert!(
1519 pre_segments >= 2,
1520 "Need multiple segments, got {}",
1521 pre_segments
1522 );
1523
1524 writer.force_merge().await.unwrap();
1526
1527 let index = Index::open(dir, config).await.unwrap();
1528 assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1529 assert_eq!(
1530 index.num_docs().await.unwrap(),
1531 total_docs as u32,
1532 "Doc count mismatch after force_merge"
1533 );
1534
1535 let results = index.query("common_term", total_docs + 100).await.unwrap();
1537 assert_eq!(
1538 results.hits.len(),
1539 total_docs,
1540 "common_term should match all docs"
1541 );
1542
1543 for check in [0, 1, total_docs / 2, total_docs - 1] {
1545 let q = format!("uid_{}", check);
1546 let results = index.query(&q, 10).await.unwrap();
1547 assert_eq!(results.hits.len(), 1, "'{}' should match exactly 1 doc", q);
1548 }
1549 }
1550
1551 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1554 async fn test_multi_round_merge_doc_integrity() {
1555 use crate::directories::MmapDirectory;
1556 let tmp_dir = tempfile::tempdir().unwrap();
1557 let dir = MmapDirectory::new(tmp_dir.path());
1558
1559 let mut schema_builder = SchemaBuilder::default();
1560 let title = schema_builder.add_text_field("title", true, true);
1561 let schema = schema_builder.build();
1562
1563 let config = IndexConfig {
1564 max_indexing_memory_bytes: 4096,
1565 num_indexing_threads: 2,
1566 merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1567 ..Default::default()
1568 };
1569
1570 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1571 .await
1572 .unwrap();
1573
1574 let mut expected_total = 0u64;
1575
1576 for round in 0..4 {
1578 let docs_this_round = 50 + round * 25; for batch in 0..5 {
1580 for i in 0..docs_this_round / 5 {
1581 let mut doc = Document::new();
1582 doc.add_text(
1583 title,
1584 format!("round_{}_batch_{}_doc_{} searchable", round, batch, i),
1585 );
1586 writer.add_document(doc).unwrap();
1587 }
1588 writer.commit().await.unwrap();
1589 }
1590 writer.wait_for_merging_thread().await;
1591
1592 expected_total += docs_this_round as u64;
1593
1594 let actual = writer
1595 .segment_manager
1596 .read_metadata(|m| {
1597 m.segment_metas
1598 .values()
1599 .map(|s| s.num_docs as u64)
1600 .sum::<u64>()
1601 })
1602 .await;
1603
1604 assert_eq!(
1605 actual, expected_total,
1606 "Round {}: expected {} docs, metadata reports {}",
1607 round, expected_total, actual
1608 );
1609 }
1610
1611 let index = Index::open(dir, config).await.unwrap();
1613 assert_eq!(index.num_docs().await.unwrap(), expected_total as u32);
1614
1615 let results = index
1616 .query("searchable", expected_total as usize + 100)
1617 .await
1618 .unwrap();
1619 assert_eq!(
1620 results.hits.len(),
1621 expected_total as usize,
1622 "All docs should match 'searchable'"
1623 );
1624
1625 let metas = index
1627 .segment_manager()
1628 .read_metadata(|m| m.segment_metas.clone())
1629 .await;
1630 let max_gen = metas.values().map(|m| m.generation).max().unwrap_or(0);
1631 eprintln!(
1632 "Final: {} segments, {} docs, max generation={}",
1633 metas.len(),
1634 expected_total,
1635 max_gen
1636 );
1637 assert!(
1638 max_gen >= 1,
1639 "Multiple merge rounds should produce gen >= 1"
1640 );
1641 }
1642
1643 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1649 async fn test_segment_count_bounded_during_sustained_indexing() {
1650 use crate::directories::MmapDirectory;
1651 let tmp_dir = tempfile::tempdir().unwrap();
1652 let dir = MmapDirectory::new(tmp_dir.path());
1653
1654 let mut schema_builder = SchemaBuilder::default();
1655 let title = schema_builder.add_text_field("title", true, false);
1656 let schema = schema_builder.build();
1657
1658 let policy = crate::merge::TieredMergePolicy {
1659 segments_per_tier: 3,
1660 max_merge_at_once: 5,
1661 tier_factor: 10.0,
1662 tier_floor: 50,
1663 max_merged_docs: 1_000_000,
1664 };
1665
1666 let config = IndexConfig {
1667 max_indexing_memory_bytes: 4096, num_indexing_threads: 1,
1669 merge_policy: Box::new(policy),
1670 max_concurrent_merges: 4,
1671 ..Default::default()
1672 };
1673
1674 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1675 .await
1676 .unwrap();
1677
1678 let num_commits = 40;
1679 let docs_per_commit = 30;
1680 let total_docs = num_commits * docs_per_commit;
1681 let mut max_segments_seen = 0usize;
1682
1683 for commit_idx in 0..num_commits {
1684 for i in 0..docs_per_commit {
1685 let mut doc = Document::new();
1686 doc.add_text(
1687 title,
1688 format!("doc_{} text", commit_idx * docs_per_commit + i),
1689 );
1690 writer.add_document(doc).unwrap();
1691 }
1692 writer.commit().await.unwrap();
1693
1694 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1696
1697 let seg_count = writer.segment_manager.get_segment_ids().await.len();
1698 max_segments_seen = max_segments_seen.max(seg_count);
1699 }
1700
1701 writer.wait_for_all_merges().await;
1703
1704 let final_segments = writer.segment_manager.get_segment_ids().await.len();
1705 let final_docs: u64 = writer
1706 .segment_manager
1707 .read_metadata(|m| {
1708 m.segment_metas
1709 .values()
1710 .map(|s| s.num_docs as u64)
1711 .sum::<u64>()
1712 })
1713 .await;
1714
1715 eprintln!(
1716 "Sustained indexing: {} commits, {} total docs, final segments={}, max segments seen={}",
1717 num_commits, total_docs, final_segments, max_segments_seen
1718 );
1719
1720 let max_allowed = num_commits / 2; assert!(
1727 max_segments_seen <= max_allowed,
1728 "Segment count grew too fast: max seen {} > allowed {} (out of {} commits). \
1729 Merging is not keeping up.",
1730 max_segments_seen,
1731 max_allowed,
1732 num_commits
1733 );
1734
1735 assert!(
1737 final_segments <= 10,
1738 "After all merges, expected ≤10 segments, got {}",
1739 final_segments
1740 );
1741
1742 assert_eq!(
1744 final_docs, total_docs as u64,
1745 "Expected {} docs, metadata reports {}",
1746 total_docs, final_docs
1747 );
1748 }
1749
1750 #[tokio::test]
1757 async fn test_needle_fulltext_single_segment() {
1758 let mut sb = SchemaBuilder::default();
1759 let title = sb.add_text_field("title", true, true);
1760 let body = sb.add_text_field("body", true, true);
1761 let schema = sb.build();
1762
1763 let dir = RamDirectory::new();
1764 let config = IndexConfig::default();
1765 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1766 .await
1767 .unwrap();
1768
1769 for i in 0..100 {
1771 let mut doc = Document::new();
1772 doc.add_text(title, format!("Hay document number {}", i));
1773 doc.add_text(
1774 body,
1775 "common words repeated across all hay documents filler text",
1776 );
1777 writer.add_document(doc).unwrap();
1778 }
1779
1780 let mut needle = Document::new();
1782 needle.add_text(title, "The unique needle xylophone");
1783 needle.add_text(
1784 body,
1785 "This document contains the extraordinary term xylophone",
1786 );
1787 writer.add_document(needle).unwrap();
1790
1791 for i in 100..150 {
1793 let mut doc = Document::new();
1794 doc.add_text(title, format!("More hay document {}", i));
1795 doc.add_text(body, "common words filler text again and again");
1796 writer.add_document(doc).unwrap();
1797 }
1798
1799 writer.commit().await.unwrap();
1800
1801 let index = Index::open(dir, config).await.unwrap();
1802 assert_eq!(index.num_docs().await.unwrap(), 151);
1803
1804 let results = index.query("xylophone", 10).await.unwrap();
1806 assert_eq!(results.hits.len(), 1, "Should find exactly the needle");
1807 assert!(results.hits[0].score > 0.0, "Score should be positive");
1808
1809 let doc = index
1811 .get_document(&results.hits[0].address)
1812 .await
1813 .unwrap()
1814 .unwrap();
1815 let title_val = doc.get_first(title).unwrap().as_text().unwrap();
1816 assert!(
1817 title_val.contains("xylophone"),
1818 "Retrieved doc should be the needle"
1819 );
1820
1821 let results = index.query("common", 200).await.unwrap();
1823 assert!(
1824 results.hits.len() >= 100,
1825 "Common term should match many docs"
1826 );
1827
1828 let results = index.query("nonexistentterm99999", 10).await.unwrap();
1830 assert_eq!(
1831 results.hits.len(),
1832 0,
1833 "Non-existent term should match nothing"
1834 );
1835 }
1836
1837 #[tokio::test]
1839 async fn test_needle_fulltext_multi_segment() {
1840 use crate::query::TermQuery;
1841
1842 let mut sb = SchemaBuilder::default();
1843 let content = sb.add_text_field("content", true, true);
1844 let schema = sb.build();
1845
1846 let dir = RamDirectory::new();
1847 let config = IndexConfig::default();
1848 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1849 .await
1850 .unwrap();
1851
1852 for i in 0..50 {
1854 let mut doc = Document::new();
1855 doc.add_text(content, format!("segment one hay document {}", i));
1856 writer.add_document(doc).unwrap();
1857 }
1858 writer.commit().await.unwrap();
1859
1860 let mut needle = Document::new();
1862 needle.add_text(content, "the magnificent quetzalcoatl serpent deity");
1863 writer.add_document(needle).unwrap();
1864 for i in 0..49 {
1865 let mut doc = Document::new();
1866 doc.add_text(content, format!("segment two hay document {}", i));
1867 writer.add_document(doc).unwrap();
1868 }
1869 writer.commit().await.unwrap();
1870
1871 for i in 0..50 {
1873 let mut doc = Document::new();
1874 doc.add_text(content, format!("segment three hay document {}", i));
1875 writer.add_document(doc).unwrap();
1876 }
1877 writer.commit().await.unwrap();
1878
1879 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1880 assert_eq!(index.num_docs().await.unwrap(), 150);
1881 let num_segments = index.segment_readers().await.unwrap().len();
1882 assert!(
1883 num_segments >= 2,
1884 "Should have multiple segments, got {}",
1885 num_segments
1886 );
1887
1888 let results = index.query("quetzalcoatl", 10).await.unwrap();
1890 assert_eq!(
1891 results.hits.len(),
1892 1,
1893 "Should find exactly 1 needle across segments"
1894 );
1895
1896 let reader = index.reader().await.unwrap();
1898 let searcher = reader.searcher().await.unwrap();
1899 let tq = TermQuery::text(content, "quetzalcoatl");
1900 let results = searcher.search(&tq, 10).await.unwrap();
1901 assert_eq!(results.len(), 1, "TermQuery should also find the needle");
1902
1903 let doc = searcher.doc(results[0].doc_id).await.unwrap().unwrap();
1905 let text = doc.get_first(content).unwrap().as_text().unwrap();
1906 assert!(
1907 text.contains("quetzalcoatl"),
1908 "Should retrieve needle content"
1909 );
1910
1911 let results = index.query("document", 200).await.unwrap();
1913 assert!(
1914 results.hits.len() >= 149,
1915 "Should find hay docs across all segments"
1916 );
1917 }
1918
1919 #[tokio::test]
1921 async fn test_needle_sparse_vector() {
1922 use crate::query::SparseVectorQuery;
1923
1924 let mut sb = SchemaBuilder::default();
1925 let title = sb.add_text_field("title", true, true);
1926 let sparse = sb.add_sparse_vector_field("sparse", true, true);
1927 let schema = sb.build();
1928
1929 let dir = RamDirectory::new();
1930 let config = IndexConfig::default();
1931 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1932 .await
1933 .unwrap();
1934
1935 for i in 0..100 {
1937 let mut doc = Document::new();
1938 doc.add_text(title, format!("Hay sparse doc {}", i));
1939 let entries: Vec<(u32, f32)> = (0..10)
1941 .map(|d| (d, 0.1 + (i as f32 * 0.001) + (d as f32 * 0.01)))
1942 .collect();
1943 doc.add_sparse_vector(sparse, entries);
1944 writer.add_document(doc).unwrap();
1945 }
1946
1947 let mut needle = Document::new();
1949 needle.add_text(title, "Needle sparse document");
1950 needle.add_sparse_vector(
1951 sparse,
1952 vec![(1000, 0.9), (1001, 0.8), (1002, 0.7), (5, 0.3)],
1953 );
1954 writer.add_document(needle).unwrap();
1955
1956 for i in 100..150 {
1958 let mut doc = Document::new();
1959 doc.add_text(title, format!("More hay sparse doc {}", i));
1960 let entries: Vec<(u32, f32)> = (0..10).map(|d| (d, 0.2 + (d as f32 * 0.02))).collect();
1961 doc.add_sparse_vector(sparse, entries);
1962 writer.add_document(doc).unwrap();
1963 }
1964
1965 writer.commit().await.unwrap();
1966
1967 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1968 assert_eq!(index.num_docs().await.unwrap(), 151);
1969
1970 let reader = index.reader().await.unwrap();
1972 let searcher = reader.searcher().await.unwrap();
1973 let query = SparseVectorQuery::new(sparse, vec![(1000, 1.0), (1001, 1.0), (1002, 1.0)]);
1974 let results = searcher.search(&query, 10).await.unwrap();
1975 assert_eq!(results.len(), 1, "Only needle has dims 1000-1002");
1976 assert!(results[0].score > 0.0, "Needle score should be positive");
1977
1978 let doc = searcher.doc(results[0].doc_id).await.unwrap().unwrap();
1980 let title_val = doc.get_first(title).unwrap().as_text().unwrap();
1981 assert_eq!(title_val, "Needle sparse document");
1982
1983 let query_shared = SparseVectorQuery::new(sparse, vec![(5, 1.0)]);
1985 let results = searcher.search(&query_shared, 200).await.unwrap();
1986 assert!(
1987 results.len() >= 100,
1988 "Shared dim 5 should match many docs, got {}",
1989 results.len()
1990 );
1991
1992 let query_missing = SparseVectorQuery::new(sparse, vec![(99999, 1.0)]);
1994 let results = searcher.search(&query_missing, 10).await.unwrap();
1995 assert_eq!(
1996 results.len(),
1997 0,
1998 "Non-existent dimension should match nothing"
1999 );
2000 }
2001
2002 #[tokio::test]
2004 async fn test_needle_sparse_vector_multi_segment_merge() {
2005 use crate::query::SparseVectorQuery;
2006
2007 let mut sb = SchemaBuilder::default();
2008 let title = sb.add_text_field("title", true, true);
2009 let sparse = sb.add_sparse_vector_field("sparse", true, true);
2010 let schema = sb.build();
2011
2012 let dir = RamDirectory::new();
2013 let config = IndexConfig::default();
2014 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
2015 .await
2016 .unwrap();
2017
2018 for i in 0..30 {
2020 let mut doc = Document::new();
2021 doc.add_text(title, format!("seg1 hay {}", i));
2022 doc.add_sparse_vector(sparse, vec![(0, 0.5), (1, 0.3)]);
2023 writer.add_document(doc).unwrap();
2024 }
2025 writer.commit().await.unwrap();
2026
2027 let mut needle = Document::new();
2029 needle.add_text(title, "seg2 needle");
2030 needle.add_sparse_vector(sparse, vec![(500, 0.95), (501, 0.85)]);
2031 writer.add_document(needle).unwrap();
2032 for i in 0..29 {
2033 let mut doc = Document::new();
2034 doc.add_text(title, format!("seg2 hay {}", i));
2035 doc.add_sparse_vector(sparse, vec![(0, 0.4), (2, 0.6)]);
2036 writer.add_document(doc).unwrap();
2037 }
2038 writer.commit().await.unwrap();
2039
2040 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
2042 assert_eq!(index.num_docs().await.unwrap(), 60);
2043
2044 let reader = index.reader().await.unwrap();
2045 let searcher = reader.searcher().await.unwrap();
2046 let query = SparseVectorQuery::new(sparse, vec![(500, 1.0), (501, 1.0)]);
2047 let results = searcher.search(&query, 10).await.unwrap();
2048 assert_eq!(results.len(), 1, "Pre-merge: needle should be found");
2049 let doc = searcher.doc(results[0].doc_id).await.unwrap().unwrap();
2050 assert_eq!(
2051 doc.get_first(title).unwrap().as_text().unwrap(),
2052 "seg2 needle"
2053 );
2054
2055 let mut writer = IndexWriter::open(dir.clone(), config.clone())
2057 .await
2058 .unwrap();
2059 writer.force_merge().await.unwrap();
2060
2061 let index = Index::open(dir, config).await.unwrap();
2063 assert_eq!(index.segment_readers().await.unwrap().len(), 1);
2064 assert_eq!(index.num_docs().await.unwrap(), 60);
2065
2066 let reader = index.reader().await.unwrap();
2067 let searcher = reader.searcher().await.unwrap();
2068 let query = SparseVectorQuery::new(sparse, vec![(500, 1.0), (501, 1.0)]);
2069 let results = searcher.search(&query, 10).await.unwrap();
2070 assert_eq!(results.len(), 1, "Post-merge: needle should still be found");
2071 let doc = searcher.doc(results[0].doc_id).await.unwrap().unwrap();
2072 assert_eq!(
2073 doc.get_first(title).unwrap().as_text().unwrap(),
2074 "seg2 needle"
2075 );
2076 }
2077
2078 #[tokio::test]
2080 async fn test_needle_dense_vector_flat() {
2081 use crate::dsl::{DenseVectorConfig, VectorIndexType};
2082 use crate::query::DenseVectorQuery;
2083
2084 let dim = 16;
2085 let mut sb = SchemaBuilder::default();
2086 let title = sb.add_text_field("title", true, true);
2087 let embedding = sb.add_dense_vector_field_with_config(
2088 "embedding",
2089 true,
2090 true,
2091 DenseVectorConfig {
2092 dim,
2093 index_type: VectorIndexType::Flat,
2094 quantization: crate::dsl::DenseVectorQuantization::F32,
2095 num_clusters: None,
2096 nprobe: 0,
2097 build_threshold: None,
2098 },
2099 );
2100 let schema = sb.build();
2101
2102 let dir = RamDirectory::new();
2103 let config = IndexConfig::default();
2104 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
2105 .await
2106 .unwrap();
2107
2108 for i in 0..100 {
2110 let mut doc = Document::new();
2111 doc.add_text(title, format!("Hay dense doc {}", i));
2112 let vec: Vec<f32> = (0..dim)
2114 .map(|d| ((i * 7 + d * 13) % 100) as f32 / 1000.0)
2115 .collect();
2116 doc.add_dense_vector(embedding, vec);
2117 writer.add_document(doc).unwrap();
2118 }
2119
2120 let mut needle = Document::new();
2122 needle.add_text(title, "Needle dense document");
2123 let needle_vec: Vec<f32> = vec![1.0; dim];
2124 needle.add_dense_vector(embedding, needle_vec.clone());
2125 writer.add_document(needle).unwrap();
2126
2127 writer.commit().await.unwrap();
2128
2129 let index = Index::open(dir, config).await.unwrap();
2130 assert_eq!(index.num_docs().await.unwrap(), 101);
2131
2132 let reader = index.reader().await.unwrap();
2134 let searcher = reader.searcher().await.unwrap();
2135 let query = DenseVectorQuery::new(embedding, needle_vec);
2136 let results = searcher.search(&query, 5).await.unwrap();
2137 assert!(!results.is_empty(), "Should find at least 1 result");
2138
2139 let top_doc = searcher.doc(results[0].doc_id).await.unwrap().unwrap();
2141 let top_title = top_doc.get_first(title).unwrap().as_text().unwrap();
2142 assert_eq!(
2143 top_title, "Needle dense document",
2144 "Top result should be the needle (exact vector match)"
2145 );
2146 assert!(
2147 results[0].score > 0.9,
2148 "Exact match should have very high cosine similarity, got {}",
2149 results[0].score
2150 );
2151 }
2152
2153 #[tokio::test]
2156 async fn test_needle_combined_all_modalities() {
2157 use crate::dsl::{DenseVectorConfig, VectorIndexType};
2158 use crate::query::{DenseVectorQuery, SparseVectorQuery, TermQuery};
2159
2160 let dim = 8;
2161 let mut sb = SchemaBuilder::default();
2162 let title = sb.add_text_field("title", true, true);
2163 let body = sb.add_text_field("body", true, true);
2164 let sparse = sb.add_sparse_vector_field("sparse", true, true);
2165 let embedding = sb.add_dense_vector_field_with_config(
2166 "embedding",
2167 true,
2168 true,
2169 DenseVectorConfig {
2170 dim,
2171 index_type: VectorIndexType::Flat,
2172 quantization: crate::dsl::DenseVectorQuantization::F32,
2173 num_clusters: None,
2174 nprobe: 0,
2175 build_threshold: None,
2176 },
2177 );
2178 let schema = sb.build();
2179
2180 let dir = RamDirectory::new();
2181 let config = IndexConfig::default();
2182 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
2183 .await
2184 .unwrap();
2185
2186 for i in 0..80u32 {
2188 let mut doc = Document::new();
2189 doc.add_text(title, format!("Hay doc {}", i));
2190 doc.add_text(body, "general filler text about nothing special");
2191 doc.add_sparse_vector(sparse, vec![(0, 0.3), (1, 0.2), ((i % 10) + 10, 0.5)]);
2192 let vec: Vec<f32> = (0..dim)
2193 .map(|d| ((i as usize * 3 + d * 7) % 50) as f32 / 100.0)
2194 .collect();
2195 doc.add_dense_vector(embedding, vec);
2196 writer.add_document(doc).unwrap();
2197 }
2198
2199 let mut needle = Document::new();
2201 needle.add_text(title, "The extraordinary rhinoceros");
2202 needle.add_text(
2203 body,
2204 "This document about rhinoceros is the only one with this word",
2205 );
2206 needle.add_sparse_vector(sparse, vec![(9999, 0.99), (9998, 0.88)]);
2207 let needle_vec = vec![0.9; dim];
2208 needle.add_dense_vector(embedding, needle_vec.clone());
2209 writer.add_document(needle).unwrap();
2210
2211 writer.commit().await.unwrap();
2212
2213 let index = Index::open(dir, config).await.unwrap();
2214 assert_eq!(index.num_docs().await.unwrap(), 81);
2215
2216 let reader = index.reader().await.unwrap();
2217 let searcher = reader.searcher().await.unwrap();
2218
2219 let tq = TermQuery::text(body, "rhinoceros");
2221 let results = searcher.search(&tq, 10).await.unwrap();
2222 assert_eq!(
2223 results.len(),
2224 1,
2225 "Full-text: should find exactly the needle"
2226 );
2227 let doc = searcher.doc(results[0].doc_id).await.unwrap().unwrap();
2228 assert!(
2229 doc.get_first(title)
2230 .unwrap()
2231 .as_text()
2232 .unwrap()
2233 .contains("rhinoceros")
2234 );
2235
2236 let sq = SparseVectorQuery::new(sparse, vec![(9999, 1.0), (9998, 1.0)]);
2238 let results = searcher.search(&sq, 10).await.unwrap();
2239 assert_eq!(results.len(), 1, "Sparse: should find exactly the needle");
2240 let doc = searcher.doc(results[0].doc_id).await.unwrap().unwrap();
2241 assert!(
2242 doc.get_first(title)
2243 .unwrap()
2244 .as_text()
2245 .unwrap()
2246 .contains("rhinoceros")
2247 );
2248
2249 let dq = DenseVectorQuery::new(embedding, needle_vec);
2251 let results = searcher.search(&dq, 1).await.unwrap();
2252 assert!(!results.is_empty(), "Dense: should find at least 1 result");
2253 let doc = searcher.doc(results[0].doc_id).await.unwrap().unwrap();
2254 assert_eq!(
2255 doc.get_first(title).unwrap().as_text().unwrap(),
2256 "The extraordinary rhinoceros",
2257 "Dense: top-1 should be the needle"
2258 );
2259
2260 let ft_doc_id = {
2262 let tq = TermQuery::text(body, "rhinoceros");
2263 let r = searcher.search(&tq, 1).await.unwrap();
2264 r[0].doc_id
2265 };
2266 let sp_doc_id = {
2267 let sq = SparseVectorQuery::new(sparse, vec![(9999, 1.0)]);
2268 let r = searcher.search(&sq, 1).await.unwrap();
2269 r[0].doc_id
2270 };
2271 let dn_doc_id = {
2272 let dq = DenseVectorQuery::new(embedding, vec![0.9; dim]);
2273 let r = searcher.search(&dq, 1).await.unwrap();
2274 r[0].doc_id
2275 };
2276
2277 assert_eq!(
2278 ft_doc_id, sp_doc_id,
2279 "Full-text and sparse should find same doc"
2280 );
2281 assert_eq!(
2282 sp_doc_id, dn_doc_id,
2283 "Sparse and dense should find same doc"
2284 );
2285 }
2286
2287 #[tokio::test]
2289 async fn test_many_needles_all_found() {
2290 let mut sb = SchemaBuilder::default();
2291 let content = sb.add_text_field("content", true, true);
2292 let schema = sb.build();
2293
2294 let dir = RamDirectory::new();
2295 let config = IndexConfig::default();
2296 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
2297 .await
2298 .unwrap();
2299
2300 let num_needles = 20usize;
2301 let hay_per_batch = 50usize;
2302 let needle_terms: Vec<String> = (0..num_needles)
2303 .map(|i| format!("uniqueneedle{:04}", i))
2304 .collect();
2305
2306 for batch in 0..4 {
2308 for i in 0..hay_per_batch {
2310 let mut doc = Document::new();
2311 doc.add_text(
2312 content,
2313 format!("hay batch {} item {} common filler", batch, i),
2314 );
2315 writer.add_document(doc).unwrap();
2316 }
2317 for n in 0..5 {
2319 let needle_idx = batch * 5 + n;
2320 let mut doc = Document::new();
2321 doc.add_text(
2322 content,
2323 format!("this is {} among many documents", needle_terms[needle_idx]),
2324 );
2325 writer.add_document(doc).unwrap();
2326 }
2327 writer.commit().await.unwrap();
2328 }
2329
2330 let index = Index::open(dir, config).await.unwrap();
2331 let total = index.num_docs().await.unwrap();
2332 assert_eq!(total, (hay_per_batch * 4 + num_needles) as u32);
2333
2334 for term in &needle_terms {
2336 let results = index.query(term, 10).await.unwrap();
2337 assert_eq!(
2338 results.hits.len(),
2339 1,
2340 "Should find exactly 1 doc for needle '{}'",
2341 term
2342 );
2343 }
2344
2345 let results = index.query("common", 500).await.unwrap();
2347 assert_eq!(
2348 results.hits.len(),
2349 hay_per_batch * 4,
2350 "Common term should match all {} hay docs",
2351 hay_per_batch * 4
2352 );
2353 }
2354}