1#[cfg(feature = "native")]
11use crate::dsl::Schema;
12#[cfg(feature = "native")]
13use crate::error::Result;
14#[cfg(feature = "native")]
15use std::sync::Arc;
16
17mod searcher;
18pub use searcher::Searcher;
19
20#[cfg(feature = "native")]
21mod reader;
22#[cfg(feature = "native")]
23mod vector_builder;
24#[cfg(feature = "native")]
25mod writer;
26#[cfg(feature = "native")]
27pub use reader::IndexReader;
28#[cfg(feature = "native")]
29pub use writer::{IndexWriter, PreparedCommit};
30
31mod metadata;
32pub use metadata::{FieldVectorMeta, INDEX_META_FILENAME, IndexMetadata, VectorIndexState};
33
34#[cfg(feature = "native")]
35mod helpers;
36#[cfg(feature = "native")]
37pub use helpers::{
38 IndexingStats, SchemaConfig, SchemaFieldConfig, create_index_at_path, create_index_from_sdl,
39 index_documents_from_reader, index_json_document, parse_schema,
40};
41
42pub const SLICE_CACHE_FILENAME: &str = "index.slicecache";
44
45#[derive(Debug, Clone)]
47pub struct IndexConfig {
48 pub num_threads: usize,
50 pub num_indexing_threads: usize,
52 pub num_compression_threads: usize,
54 pub term_cache_blocks: usize,
56 pub store_cache_blocks: usize,
58 pub max_indexing_memory_bytes: usize,
60 pub merge_policy: Box<dyn crate::merge::MergePolicy>,
62 pub optimization: crate::structures::IndexOptimization,
64 pub reload_interval_ms: u64,
66 pub max_concurrent_merges: usize,
68}
69
70impl Default for IndexConfig {
71 fn default() -> Self {
72 #[cfg(feature = "native")]
73 let indexing_threads = crate::default_indexing_threads();
74 #[cfg(not(feature = "native"))]
75 let indexing_threads = 1;
76
77 #[cfg(feature = "native")]
78 let compression_threads = crate::default_compression_threads();
79 #[cfg(not(feature = "native"))]
80 let compression_threads = 1;
81
82 Self {
83 num_threads: indexing_threads,
84 num_indexing_threads: 1, num_compression_threads: compression_threads,
86 term_cache_blocks: 256,
87 store_cache_blocks: 32,
88 max_indexing_memory_bytes: 256 * 1024 * 1024, merge_policy: Box::new(crate::merge::TieredMergePolicy::default()),
90 optimization: crate::structures::IndexOptimization::default(),
91 reload_interval_ms: 1000, max_concurrent_merges: 4,
93 }
94 }
95}
96
97#[cfg(feature = "native")]
106pub struct Index<D: crate::directories::DirectoryWriter + 'static> {
107 directory: Arc<D>,
108 schema: Arc<Schema>,
109 config: IndexConfig,
110 segment_manager: Arc<crate::merge::SegmentManager<D>>,
112 cached_reader: tokio::sync::OnceCell<IndexReader<D>>,
114}
115
116#[cfg(feature = "native")]
117impl<D: crate::directories::DirectoryWriter + 'static> Index<D> {
118 pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
120 let directory = Arc::new(directory);
121 let schema = Arc::new(schema);
122 let metadata = IndexMetadata::new((*schema).clone());
123
124 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
125 Arc::clone(&directory),
126 Arc::clone(&schema),
127 metadata,
128 config.merge_policy.clone_box(),
129 config.term_cache_blocks,
130 config.max_concurrent_merges,
131 ));
132
133 segment_manager.update_metadata(|_| {}).await?;
135
136 Ok(Self {
137 directory,
138 schema,
139 config,
140 segment_manager,
141 cached_reader: tokio::sync::OnceCell::new(),
142 })
143 }
144
145 pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
147 let directory = Arc::new(directory);
148
149 let metadata = IndexMetadata::load(directory.as_ref()).await?;
151 let schema = Arc::new(metadata.schema.clone());
152
153 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
154 Arc::clone(&directory),
155 Arc::clone(&schema),
156 metadata,
157 config.merge_policy.clone_box(),
158 config.term_cache_blocks,
159 config.max_concurrent_merges,
160 ));
161
162 segment_manager.load_and_publish_trained().await;
164
165 Ok(Self {
166 directory,
167 schema,
168 config,
169 segment_manager,
170 cached_reader: tokio::sync::OnceCell::new(),
171 })
172 }
173
174 pub fn schema(&self) -> &Schema {
176 &self.schema
177 }
178
179 pub fn schema_arc(&self) -> &Arc<Schema> {
181 &self.schema
182 }
183
184 pub fn directory(&self) -> &D {
186 &self.directory
187 }
188
189 pub fn segment_manager(&self) -> &Arc<crate::merge::SegmentManager<D>> {
191 &self.segment_manager
192 }
193
194 pub async fn reader(&self) -> Result<&IndexReader<D>> {
199 self.cached_reader
200 .get_or_try_init(|| async {
201 IndexReader::from_segment_manager(
202 Arc::clone(&self.schema),
203 Arc::clone(&self.segment_manager),
204 self.config.term_cache_blocks,
205 self.config.reload_interval_ms,
206 )
207 .await
208 })
209 .await
210 }
211
212 pub fn config(&self) -> &IndexConfig {
214 &self.config
215 }
216
217 pub async fn segment_readers(&self) -> Result<Vec<Arc<crate::segment::SegmentReader>>> {
219 let reader = self.reader().await?;
220 let searcher = reader.searcher().await?;
221 Ok(searcher.segment_readers().to_vec())
222 }
223
224 pub async fn num_docs(&self) -> Result<u32> {
226 let reader = self.reader().await?;
227 let searcher = reader.searcher().await?;
228 Ok(searcher.num_docs())
229 }
230
231 pub fn default_fields(&self) -> Vec<crate::Field> {
233 if !self.schema.default_fields().is_empty() {
234 self.schema.default_fields().to_vec()
235 } else {
236 self.schema
237 .fields()
238 .filter(|(_, entry)| {
239 entry.indexed && entry.field_type == crate::dsl::FieldType::Text
240 })
241 .map(|(field, _)| field)
242 .collect()
243 }
244 }
245
246 pub fn tokenizers(&self) -> Arc<crate::tokenizer::TokenizerRegistry> {
248 Arc::new(crate::tokenizer::TokenizerRegistry::default())
249 }
250
251 pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
253 let default_fields = self.default_fields();
254 let tokenizers = self.tokenizers();
255
256 let query_routers = self.schema.query_routers();
257 if !query_routers.is_empty()
258 && let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers)
259 {
260 return crate::dsl::QueryLanguageParser::with_router(
261 Arc::clone(&self.schema),
262 default_fields,
263 tokenizers,
264 router,
265 );
266 }
267
268 crate::dsl::QueryLanguageParser::new(Arc::clone(&self.schema), default_fields, tokenizers)
269 }
270
271 pub async fn query(
273 &self,
274 query_str: &str,
275 limit: usize,
276 ) -> Result<crate::query::SearchResponse> {
277 self.query_offset(query_str, limit, 0).await
278 }
279
280 pub async fn query_offset(
282 &self,
283 query_str: &str,
284 limit: usize,
285 offset: usize,
286 ) -> Result<crate::query::SearchResponse> {
287 let parser = self.query_parser();
288 let query = parser
289 .parse(query_str)
290 .map_err(crate::error::Error::Query)?;
291 self.search_offset(query.as_ref(), limit, offset).await
292 }
293
294 pub async fn search(
296 &self,
297 query: &dyn crate::query::Query,
298 limit: usize,
299 ) -> Result<crate::query::SearchResponse> {
300 self.search_offset(query, limit, 0).await
301 }
302
303 pub async fn search_offset(
305 &self,
306 query: &dyn crate::query::Query,
307 limit: usize,
308 offset: usize,
309 ) -> Result<crate::query::SearchResponse> {
310 let reader = self.reader().await?;
311 let searcher = reader.searcher().await?;
312 let segments = searcher.segment_readers();
313
314 let fetch_limit = offset + limit;
315
316 let futures: Vec<_> = segments
317 .iter()
318 .map(|segment| {
319 let sid = segment.meta().id;
320 async move {
321 let results =
322 crate::query::search_segment(segment.as_ref(), query, fetch_limit).await?;
323 Ok::<_, crate::error::Error>(
324 results
325 .into_iter()
326 .map(move |r| (sid, r))
327 .collect::<Vec<_>>(),
328 )
329 }
330 })
331 .collect();
332
333 let batches = futures::future::try_join_all(futures).await?;
334 let mut all_results: Vec<(u128, crate::query::SearchResult)> =
335 Vec::with_capacity(batches.iter().map(|b| b.len()).sum());
336 for batch in batches {
337 all_results.extend(batch);
338 }
339
340 all_results.sort_by(|a, b| {
341 b.1.score
342 .partial_cmp(&a.1.score)
343 .unwrap_or(std::cmp::Ordering::Equal)
344 });
345
346 let total_hits = all_results.len() as u32;
347
348 let hits: Vec<crate::query::SearchHit> = all_results
349 .into_iter()
350 .skip(offset)
351 .take(limit)
352 .map(|(segment_id, result)| crate::query::SearchHit {
353 address: crate::query::DocAddress::new(segment_id, result.doc_id),
354 score: result.score,
355 matched_fields: result.extract_ordinals(),
356 })
357 .collect();
358
359 Ok(crate::query::SearchResponse { hits, total_hits })
360 }
361
362 pub async fn get_document(
364 &self,
365 address: &crate::query::DocAddress,
366 ) -> Result<Option<crate::dsl::Document>> {
367 let reader = self.reader().await?;
368 let searcher = reader.searcher().await?;
369 searcher.get_document(address).await
370 }
371
372 pub async fn get_postings(
374 &self,
375 field: crate::Field,
376 term: &[u8],
377 ) -> Result<
378 Vec<(
379 Arc<crate::segment::SegmentReader>,
380 crate::structures::BlockPostingList,
381 )>,
382 > {
383 let segments = self.segment_readers().await?;
384 let mut results = Vec::new();
385
386 for segment in segments {
387 if let Some(postings) = segment.get_postings(field, term).await? {
388 results.push((segment, postings));
389 }
390 }
391
392 Ok(results)
393 }
394}
395
396#[cfg(feature = "native")]
398impl<D: crate::directories::DirectoryWriter + 'static> Index<D> {
399 pub fn writer(&self) -> writer::IndexWriter<D> {
401 writer::IndexWriter::from_index(self)
402 }
403}
404
405#[cfg(test)]
406mod tests {
407 use super::*;
408 use crate::directories::RamDirectory;
409 use crate::dsl::{Document, SchemaBuilder};
410
411 #[tokio::test]
412 async fn test_index_create_and_search() {
413 let mut schema_builder = SchemaBuilder::default();
414 let title = schema_builder.add_text_field("title", true, true);
415 let body = schema_builder.add_text_field("body", true, true);
416 let schema = schema_builder.build();
417
418 let dir = RamDirectory::new();
419 let config = IndexConfig::default();
420
421 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
423 .await
424 .unwrap();
425
426 let mut doc1 = Document::new();
427 doc1.add_text(title, "Hello World");
428 doc1.add_text(body, "This is the first document");
429 writer.add_document(doc1).unwrap();
430
431 let mut doc2 = Document::new();
432 doc2.add_text(title, "Goodbye World");
433 doc2.add_text(body, "This is the second document");
434 writer.add_document(doc2).unwrap();
435
436 writer.commit().await.unwrap();
437
438 let index = Index::open(dir, config).await.unwrap();
440 assert_eq!(index.num_docs().await.unwrap(), 2);
441
442 let postings = index.get_postings(title, b"world").await.unwrap();
444 assert_eq!(postings.len(), 1); assert_eq!(postings[0].1.doc_count(), 2); let reader = index.reader().await.unwrap();
449 let searcher = reader.searcher().await.unwrap();
450 let doc = searcher.doc(0).await.unwrap().unwrap();
451 assert_eq!(doc.get_first(title).unwrap().as_text(), Some("Hello World"));
452 }
453
454 #[tokio::test]
455 async fn test_multiple_segments() {
456 let mut schema_builder = SchemaBuilder::default();
457 let title = schema_builder.add_text_field("title", true, true);
458 let schema = schema_builder.build();
459
460 let dir = RamDirectory::new();
461 let config = IndexConfig {
462 max_indexing_memory_bytes: 1024, ..Default::default()
464 };
465
466 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
467 .await
468 .unwrap();
469
470 for batch in 0..3 {
472 for i in 0..5 {
473 let mut doc = Document::new();
474 doc.add_text(title, format!("Document {} batch {}", i, batch));
475 writer.add_document(doc).unwrap();
476 }
477 writer.commit().await.unwrap();
478 }
479
480 let index = Index::open(dir, config).await.unwrap();
482 assert_eq!(index.num_docs().await.unwrap(), 15);
483 assert!(
485 index.segment_readers().await.unwrap().len() >= 2,
486 "Expected multiple segments"
487 );
488 }
489
490 #[tokio::test]
491 async fn test_segment_merge() {
492 let mut schema_builder = SchemaBuilder::default();
493 let title = schema_builder.add_text_field("title", true, true);
494 let schema = schema_builder.build();
495
496 let dir = RamDirectory::new();
497 let config = IndexConfig {
498 max_indexing_memory_bytes: 512, ..Default::default()
500 };
501
502 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
503 .await
504 .unwrap();
505
506 for batch in 0..3 {
508 for i in 0..3 {
509 let mut doc = Document::new();
510 doc.add_text(title, format!("Document {} batch {}", i, batch));
511 writer.add_document(doc).unwrap();
512 }
513 writer.commit().await.unwrap();
514 }
515
516 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
518 assert!(
519 index.segment_readers().await.unwrap().len() >= 2,
520 "Expected multiple segments"
521 );
522
523 let mut writer = IndexWriter::open(dir.clone(), config.clone())
525 .await
526 .unwrap();
527 writer.force_merge().await.unwrap();
528
529 let index = Index::open(dir, config).await.unwrap();
531 assert_eq!(index.segment_readers().await.unwrap().len(), 1);
532 assert_eq!(index.num_docs().await.unwrap(), 9);
533
534 let reader = index.reader().await.unwrap();
536 let searcher = reader.searcher().await.unwrap();
537 let mut found_docs = 0;
538 for i in 0..9 {
539 if searcher.doc(i).await.unwrap().is_some() {
540 found_docs += 1;
541 }
542 }
543 assert_eq!(found_docs, 9);
544 }
545
546 #[tokio::test]
547 async fn test_match_query() {
548 let mut schema_builder = SchemaBuilder::default();
549 let title = schema_builder.add_text_field("title", true, true);
550 let body = schema_builder.add_text_field("body", true, true);
551 let schema = schema_builder.build();
552
553 let dir = RamDirectory::new();
554 let config = IndexConfig::default();
555
556 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
557 .await
558 .unwrap();
559
560 let mut doc1 = Document::new();
561 doc1.add_text(title, "rust programming");
562 doc1.add_text(body, "Learn rust language");
563 writer.add_document(doc1).unwrap();
564
565 let mut doc2 = Document::new();
566 doc2.add_text(title, "python programming");
567 doc2.add_text(body, "Learn python language");
568 writer.add_document(doc2).unwrap();
569
570 writer.commit().await.unwrap();
571
572 let index = Index::open(dir, config).await.unwrap();
573
574 let results = index.query("rust", 10).await.unwrap();
576 assert_eq!(results.hits.len(), 1);
577
578 let results = index.query("rust programming", 10).await.unwrap();
580 assert!(!results.hits.is_empty());
581
582 let hit = &results.hits[0];
584 assert!(!hit.address.segment_id.is_empty(), "Should have segment_id");
585
586 let doc = index.get_document(&hit.address).await.unwrap().unwrap();
588 assert!(
589 !doc.field_values().is_empty(),
590 "Doc should have field values"
591 );
592
593 let reader = index.reader().await.unwrap();
595 let searcher = reader.searcher().await.unwrap();
596 let doc = searcher.doc(0).await.unwrap().unwrap();
597 assert!(
598 !doc.field_values().is_empty(),
599 "Doc should have field values"
600 );
601 }
602
603 #[tokio::test]
604 async fn test_slice_cache_warmup_and_load() {
605 use crate::directories::SliceCachingDirectory;
606
607 let mut schema_builder = SchemaBuilder::default();
608 let title = schema_builder.add_text_field("title", true, true);
609 let body = schema_builder.add_text_field("body", true, true);
610 let schema = schema_builder.build();
611
612 let dir = RamDirectory::new();
613 let config = IndexConfig::default();
614
615 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
617 .await
618 .unwrap();
619
620 for i in 0..10 {
621 let mut doc = Document::new();
622 doc.add_text(title, format!("Document {} about rust", i));
623 doc.add_text(body, format!("This is body text number {}", i));
624 writer.add_document(doc).unwrap();
625 }
626 writer.commit().await.unwrap();
627
628 let caching_dir = SliceCachingDirectory::new(dir.clone(), 1024 * 1024);
630 let index = Index::open(caching_dir, config.clone()).await.unwrap();
631
632 let results = index.query("rust", 10).await.unwrap();
634 assert!(!results.hits.is_empty());
635
636 let stats = index.directory.stats();
638 assert!(stats.total_bytes > 0, "Cache should have data after search");
639 }
640
641 #[tokio::test]
642 async fn test_multivalue_field_indexing_and_search() {
643 let mut schema_builder = SchemaBuilder::default();
644 let uris = schema_builder.add_text_field("uris", true, true);
645 let title = schema_builder.add_text_field("title", true, true);
646 let schema = schema_builder.build();
647
648 let dir = RamDirectory::new();
649 let config = IndexConfig::default();
650
651 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
653 .await
654 .unwrap();
655
656 let mut doc = Document::new();
657 doc.add_text(uris, "one");
658 doc.add_text(uris, "two");
659 doc.add_text(title, "Test Document");
660 writer.add_document(doc).unwrap();
661
662 let mut doc2 = Document::new();
664 doc2.add_text(uris, "three");
665 doc2.add_text(title, "Another Document");
666 writer.add_document(doc2).unwrap();
667
668 writer.commit().await.unwrap();
669
670 let index = Index::open(dir, config).await.unwrap();
672 assert_eq!(index.num_docs().await.unwrap(), 2);
673
674 let reader = index.reader().await.unwrap();
676 let searcher = reader.searcher().await.unwrap();
677 let doc = searcher.doc(0).await.unwrap().unwrap();
678 let all_uris: Vec<_> = doc.get_all(uris).collect();
679 assert_eq!(all_uris.len(), 2, "Should have 2 uris values");
680 assert_eq!(all_uris[0].as_text(), Some("one"));
681 assert_eq!(all_uris[1].as_text(), Some("two"));
682
683 let json = doc.to_json(index.schema());
685 let uris_json = json.get("uris").unwrap();
686 assert!(uris_json.is_array(), "Multi-value field should be an array");
687 let uris_arr = uris_json.as_array().unwrap();
688 assert_eq!(uris_arr.len(), 2);
689 assert_eq!(uris_arr[0].as_str(), Some("one"));
690 assert_eq!(uris_arr[1].as_str(), Some("two"));
691
692 let results = index.query("uris:one", 10).await.unwrap();
694 assert_eq!(results.hits.len(), 1, "Should find doc with 'one'");
695 assert_eq!(results.hits[0].address.doc_id, 0);
696
697 let results = index.query("uris:two", 10).await.unwrap();
698 assert_eq!(results.hits.len(), 1, "Should find doc with 'two'");
699 assert_eq!(results.hits[0].address.doc_id, 0);
700
701 let results = index.query("uris:three", 10).await.unwrap();
702 assert_eq!(results.hits.len(), 1, "Should find doc with 'three'");
703 assert_eq!(results.hits[0].address.doc_id, 1);
704
705 let results = index.query("uris:nonexistent", 10).await.unwrap();
707 assert_eq!(results.hits.len(), 0, "Should not find non-existent value");
708 }
709
710 #[tokio::test]
717 async fn test_maxscore_optimization_for_or_queries() {
718 use crate::query::{BooleanQuery, TermQuery};
719
720 let mut schema_builder = SchemaBuilder::default();
721 let content = schema_builder.add_text_field("content", true, true);
722 let schema = schema_builder.build();
723
724 let dir = RamDirectory::new();
725 let config = IndexConfig::default();
726
727 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
729 .await
730 .unwrap();
731
732 let mut doc = Document::new();
734 doc.add_text(content, "rust programming language is fast");
735 writer.add_document(doc).unwrap();
736
737 let mut doc = Document::new();
739 doc.add_text(content, "rust is a systems language");
740 writer.add_document(doc).unwrap();
741
742 let mut doc = Document::new();
744 doc.add_text(content, "programming is fun");
745 writer.add_document(doc).unwrap();
746
747 let mut doc = Document::new();
749 doc.add_text(content, "python is easy to learn");
750 writer.add_document(doc).unwrap();
751
752 let mut doc = Document::new();
754 doc.add_text(content, "rust rust programming programming systems");
755 writer.add_document(doc).unwrap();
756
757 writer.commit().await.unwrap();
758
759 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
761
762 let or_query = BooleanQuery::new()
764 .should(TermQuery::text(content, "rust"))
765 .should(TermQuery::text(content, "programming"));
766
767 let results = index.search(&or_query, 10).await.unwrap();
768
769 assert_eq!(results.hits.len(), 4, "Should find exactly 4 documents");
771
772 let doc_ids: Vec<u32> = results.hits.iter().map(|h| h.address.doc_id).collect();
773 assert!(doc_ids.contains(&0), "Should find doc 0");
774 assert!(doc_ids.contains(&1), "Should find doc 1");
775 assert!(doc_ids.contains(&2), "Should find doc 2");
776 assert!(doc_ids.contains(&4), "Should find doc 4");
777 assert!(
778 !doc_ids.contains(&3),
779 "Should NOT find doc 3 (only has 'python')"
780 );
781
782 let single_query = BooleanQuery::new().should(TermQuery::text(content, "rust"));
784
785 let results = index.search(&single_query, 10).await.unwrap();
786 assert_eq!(results.hits.len(), 3, "Should find 3 documents with 'rust'");
787
788 let must_query = BooleanQuery::new()
790 .must(TermQuery::text(content, "rust"))
791 .should(TermQuery::text(content, "programming"));
792
793 let results = index.search(&must_query, 10).await.unwrap();
794 assert_eq!(results.hits.len(), 3, "Should find 3 documents with 'rust'");
796
797 let must_not_query = BooleanQuery::new()
799 .should(TermQuery::text(content, "rust"))
800 .should(TermQuery::text(content, "programming"))
801 .must_not(TermQuery::text(content, "systems"));
802
803 let results = index.search(&must_not_query, 10).await.unwrap();
804 let doc_ids: Vec<u32> = results.hits.iter().map(|h| h.address.doc_id).collect();
806 assert!(
807 !doc_ids.contains(&1),
808 "Should NOT find doc 1 (has 'systems')"
809 );
810 assert!(
811 !doc_ids.contains(&4),
812 "Should NOT find doc 4 (has 'systems')"
813 );
814
815 let or_query = BooleanQuery::new()
817 .should(TermQuery::text(content, "rust"))
818 .should(TermQuery::text(content, "programming"));
819
820 let results = index.search(&or_query, 2).await.unwrap();
821 assert_eq!(results.hits.len(), 2, "Should return only top 2 results");
822
823 }
826
827 #[tokio::test]
829 async fn test_boolean_or_maxscore_optimization() {
830 use crate::query::{BooleanQuery, TermQuery};
831
832 let mut schema_builder = SchemaBuilder::default();
833 let content = schema_builder.add_text_field("content", true, true);
834 let schema = schema_builder.build();
835
836 let dir = RamDirectory::new();
837 let config = IndexConfig::default();
838
839 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
840 .await
841 .unwrap();
842
843 for i in 0..10 {
845 let mut doc = Document::new();
846 let text = match i % 4 {
847 0 => "apple banana cherry",
848 1 => "apple orange",
849 2 => "banana grape",
850 _ => "cherry date",
851 };
852 doc.add_text(content, text);
853 writer.add_document(doc).unwrap();
854 }
855
856 writer.commit().await.unwrap();
857 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
858
859 let query = BooleanQuery::new()
861 .should(TermQuery::text(content, "apple"))
862 .should(TermQuery::text(content, "banana"));
863
864 let results = index.search(&query, 10).await.unwrap();
865
866 assert_eq!(results.hits.len(), 8, "Should find all matching docs");
869 }
870
871 #[tokio::test]
872 async fn test_vector_index_threshold_switch() {
873 use crate::dsl::{DenseVectorConfig, DenseVectorQuantization, VectorIndexType};
874
875 let mut schema_builder = SchemaBuilder::default();
877 let title = schema_builder.add_text_field("title", true, true);
878 let embedding = schema_builder.add_dense_vector_field_with_config(
879 "embedding",
880 true, true, DenseVectorConfig {
883 dim: 8,
884 index_type: VectorIndexType::IvfRaBitQ,
885 quantization: DenseVectorQuantization::F32,
886 num_clusters: Some(4), nprobe: 2,
888 build_threshold: Some(50), unit_norm: false,
890 },
891 );
892 let schema = schema_builder.build();
893
894 let dir = RamDirectory::new();
895 let config = IndexConfig::default();
896
897 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
899 .await
900 .unwrap();
901
902 for i in 0..30 {
904 let mut doc = Document::new();
905 doc.add_text(title, format!("Document {}", i));
906 let vec: Vec<f32> = (0..8).map(|_| (i as f32) / 30.0).collect();
908 doc.add_dense_vector(embedding, vec);
909 writer.add_document(doc).unwrap();
910 }
911 writer.commit().await.unwrap();
912
913 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
915 assert!(
916 index.segment_manager.trained().is_none(),
917 "Should not have trained centroids below threshold"
918 );
919
920 let query_vec: Vec<f32> = vec![0.5; 8];
922 let segments = index.segment_readers().await.unwrap();
923 assert!(!segments.is_empty());
924
925 let results = segments[0]
926 .search_dense_vector(
927 embedding,
928 &query_vec,
929 5,
930 0,
931 1,
932 crate::query::MultiValueCombiner::Max,
933 )
934 .await
935 .unwrap();
936 assert!(!results.is_empty(), "Flat search should return results");
937
938 let mut writer = IndexWriter::open(dir.clone(), config.clone())
940 .await
941 .unwrap();
942
943 for i in 30..60 {
945 let mut doc = Document::new();
946 doc.add_text(title, format!("Document {}", i));
947 let vec: Vec<f32> = (0..8).map(|_| (i as f32) / 60.0).collect();
948 doc.add_dense_vector(embedding, vec);
949 writer.add_document(doc).unwrap();
950 }
951 writer.commit().await.unwrap();
952
953 writer.build_vector_index().await.unwrap();
955
956 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
958 assert!(
959 index.segment_manager.trained().is_some(),
960 "Should have loaded trained centroids for embedding field"
961 );
962
963 let segments = index.segment_readers().await.unwrap();
965 let results = segments[0]
966 .search_dense_vector(
967 embedding,
968 &query_vec,
969 5,
970 0,
971 1,
972 crate::query::MultiValueCombiner::Max,
973 )
974 .await
975 .unwrap();
976 assert!(
977 !results.is_empty(),
978 "Search should return results after build"
979 );
980
981 let writer = IndexWriter::open(dir.clone(), config.clone())
983 .await
984 .unwrap();
985 writer.build_vector_index().await.unwrap(); assert!(writer.segment_manager.trained().is_some());
989 }
990
991 #[tokio::test]
994 async fn test_multi_round_merge_with_search() {
995 let mut schema_builder = SchemaBuilder::default();
996 let title = schema_builder.add_text_field("title", true, true);
997 let body = schema_builder.add_text_field("body", true, true);
998 let schema = schema_builder.build();
999
1000 let dir = RamDirectory::new();
1001 let config = IndexConfig {
1002 max_indexing_memory_bytes: 512,
1003 ..Default::default()
1004 };
1005
1006 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1008 .await
1009 .unwrap();
1010
1011 for batch in 0..5 {
1012 for i in 0..10 {
1013 let mut doc = Document::new();
1014 doc.add_text(
1015 title,
1016 format!("alpha bravo charlie batch{} doc{}", batch, i),
1017 );
1018 doc.add_text(
1019 body,
1020 format!("the quick brown fox jumps over the lazy dog number {}", i),
1021 );
1022 writer.add_document(doc).unwrap();
1023 }
1024 writer.commit().await.unwrap();
1025 }
1026
1027 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1028 let pre_merge_segments = index.segment_readers().await.unwrap().len();
1029 assert!(
1030 pre_merge_segments >= 3,
1031 "Expected >=3 segments, got {}",
1032 pre_merge_segments
1033 );
1034 assert_eq!(index.num_docs().await.unwrap(), 50);
1035
1036 let results = index.query("alpha", 100).await.unwrap();
1038 assert_eq!(results.hits.len(), 50, "all 50 docs should match 'alpha'");
1039
1040 let results = index.query("fox", 100).await.unwrap();
1041 assert_eq!(results.hits.len(), 50, "all 50 docs should match 'fox'");
1042
1043 let mut writer = IndexWriter::open(dir.clone(), config.clone())
1045 .await
1046 .unwrap();
1047 writer.force_merge().await.unwrap();
1048
1049 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1050 assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1051 assert_eq!(index.num_docs().await.unwrap(), 50);
1052
1053 let results = index.query("alpha", 100).await.unwrap();
1055 assert_eq!(
1056 results.hits.len(),
1057 50,
1058 "all 50 docs should match 'alpha' after merge 1"
1059 );
1060
1061 let results = index.query("fox", 100).await.unwrap();
1062 assert_eq!(
1063 results.hits.len(),
1064 50,
1065 "all 50 docs should match 'fox' after merge 1"
1066 );
1067
1068 let reader1 = index.reader().await.unwrap();
1070 let searcher1 = reader1.searcher().await.unwrap();
1071 for i in 0..50 {
1072 let doc = searcher1.doc(i).await.unwrap();
1073 assert!(doc.is_some(), "doc {} should exist after merge 1", i);
1074 }
1075
1076 let mut writer = IndexWriter::open(dir.clone(), config.clone())
1078 .await
1079 .unwrap();
1080 for batch in 0..3 {
1081 for i in 0..10 {
1082 let mut doc = Document::new();
1083 doc.add_text(
1084 title,
1085 format!("delta echo foxtrot round2_batch{} doc{}", batch, i),
1086 );
1087 doc.add_text(
1088 body,
1089 format!("the quick brown fox jumps again number {}", i),
1090 );
1091 writer.add_document(doc).unwrap();
1092 }
1093 writer.commit().await.unwrap();
1094 }
1095
1096 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1097 assert_eq!(index.num_docs().await.unwrap(), 80);
1098 assert!(
1099 index.segment_readers().await.unwrap().len() >= 2,
1100 "Should have >=2 segments after round 2 ingestion"
1101 );
1102
1103 let results = index.query("fox", 100).await.unwrap();
1105 assert_eq!(results.hits.len(), 80, "all 80 docs should match 'fox'");
1106
1107 let results = index.query("alpha", 100).await.unwrap();
1108 assert_eq!(results.hits.len(), 50, "only round 1 docs match 'alpha'");
1109
1110 let results = index.query("delta", 100).await.unwrap();
1111 assert_eq!(results.hits.len(), 30, "only round 2 docs match 'delta'");
1112
1113 let mut writer = IndexWriter::open(dir.clone(), config.clone())
1115 .await
1116 .unwrap();
1117 writer.force_merge().await.unwrap();
1118
1119 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1120 assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1121 assert_eq!(index.num_docs().await.unwrap(), 80);
1122
1123 let results = index.query("fox", 100).await.unwrap();
1125 assert_eq!(results.hits.len(), 80, "all 80 docs after merge 2");
1126
1127 let results = index.query("alpha", 100).await.unwrap();
1128 assert_eq!(results.hits.len(), 50, "round 1 docs after merge 2");
1129
1130 let results = index.query("delta", 100).await.unwrap();
1131 assert_eq!(results.hits.len(), 30, "round 2 docs after merge 2");
1132
1133 let reader2 = index.reader().await.unwrap();
1135 let searcher2 = reader2.searcher().await.unwrap();
1136 for i in 0..80 {
1137 let doc = searcher2.doc(i).await.unwrap();
1138 assert!(doc.is_some(), "doc {} should exist after merge 2", i);
1139 }
1140 }
1141
1142 #[tokio::test]
1145 async fn test_large_scale_merge_correctness() {
1146 let mut schema_builder = SchemaBuilder::default();
1147 let title = schema_builder.add_text_field("title", true, true);
1148 let schema = schema_builder.build();
1149
1150 let dir = RamDirectory::new();
1151 let config = IndexConfig {
1152 max_indexing_memory_bytes: 512,
1153 ..Default::default()
1154 };
1155
1156 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1157 .await
1158 .unwrap();
1159
1160 let total_docs = 200u32;
1163 for batch in 0..8 {
1164 for i in 0..25 {
1165 let mut doc = Document::new();
1166 doc.add_text(
1167 title,
1168 format!("common shared term unique_{} item{}", batch, i),
1169 );
1170 writer.add_document(doc).unwrap();
1171 }
1172 writer.commit().await.unwrap();
1173 }
1174
1175 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1177 assert_eq!(index.num_docs().await.unwrap(), total_docs);
1178
1179 let results = index.query("common", 300).await.unwrap();
1180 assert_eq!(
1181 results.hits.len(),
1182 total_docs as usize,
1183 "all docs should match 'common'"
1184 );
1185
1186 for batch in 0..8 {
1188 let q = format!("unique_{}", batch);
1189 let results = index.query(&q, 100).await.unwrap();
1190 assert_eq!(results.hits.len(), 25, "'{}' should match 25 docs", q);
1191 }
1192
1193 let mut writer = IndexWriter::open(dir.clone(), config.clone())
1195 .await
1196 .unwrap();
1197 writer.force_merge().await.unwrap();
1198
1199 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1201 assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1202 assert_eq!(index.num_docs().await.unwrap(), total_docs);
1203
1204 let results = index.query("common", 300).await.unwrap();
1205 assert_eq!(results.hits.len(), total_docs as usize);
1206
1207 for batch in 0..8 {
1208 let q = format!("unique_{}", batch);
1209 let results = index.query(&q, 100).await.unwrap();
1210 assert_eq!(results.hits.len(), 25, "'{}' after merge", q);
1211 }
1212
1213 let reader = index.reader().await.unwrap();
1215 let searcher = reader.searcher().await.unwrap();
1216 for i in 0..total_docs {
1217 let doc = searcher.doc(i).await.unwrap();
1218 assert!(doc.is_some(), "doc {} missing after merge", i);
1219 }
1220 }
1221
1222 #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
1226 async fn test_auto_merge_triggered() {
1227 use crate::directories::MmapDirectory;
1228 let tmp_dir = tempfile::tempdir().unwrap();
1229 let dir = MmapDirectory::new(tmp_dir.path());
1230
1231 let mut schema_builder = SchemaBuilder::default();
1232 let title = schema_builder.add_text_field("title", true, true);
1233 let body = schema_builder.add_text_field("body", true, true);
1234 let schema = schema_builder.build();
1235
1236 let config = IndexConfig {
1238 max_indexing_memory_bytes: 4096,
1239 num_indexing_threads: 4,
1240 merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1241 ..Default::default()
1242 };
1243
1244 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1245 .await
1246 .unwrap();
1247
1248 for batch in 0..12 {
1250 for i in 0..50 {
1251 let mut doc = Document::new();
1252 doc.add_text(title, format!("document_{} batch_{} alpha bravo", i, batch));
1253 doc.add_text(
1254 body,
1255 format!(
1256 "the quick brown fox jumps over lazy dog number {} round {}",
1257 i, batch
1258 ),
1259 );
1260 writer.add_document(doc).unwrap();
1261 }
1262 writer.commit().await.unwrap();
1263 }
1264
1265 let pre_merge = writer.segment_manager.get_segment_ids().await.len();
1266
1267 writer.wait_for_merging_thread().await;
1270 writer.maybe_merge().await;
1271 writer.wait_for_merging_thread().await;
1272
1273 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1275 let segment_count = index.segment_readers().await.unwrap().len();
1276 eprintln!(
1277 "Segments: {} before merge, {} after auto-merge",
1278 pre_merge, segment_count
1279 );
1280 assert!(
1281 segment_count < pre_merge,
1282 "Expected auto-merge to reduce segments from {}, got {}",
1283 pre_merge,
1284 segment_count
1285 );
1286 }
1287
1288 #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
1293 async fn test_commit_with_vectors_and_background_merge() {
1294 use crate::directories::MmapDirectory;
1295 use crate::dsl::DenseVectorConfig;
1296
1297 let tmp_dir = tempfile::tempdir().unwrap();
1298 let dir = MmapDirectory::new(tmp_dir.path());
1299
1300 let mut schema_builder = SchemaBuilder::default();
1301 let title = schema_builder.add_text_field("title", true, true);
1302 let vec_config = DenseVectorConfig::new(8).with_build_threshold(10);
1304 let embedding =
1305 schema_builder.add_dense_vector_field_with_config("embedding", true, true, vec_config);
1306 let schema = schema_builder.build();
1307
1308 let config = IndexConfig {
1310 max_indexing_memory_bytes: 4096,
1311 num_indexing_threads: 4,
1312 merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1313 ..Default::default()
1314 };
1315
1316 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1317 .await
1318 .unwrap();
1319
1320 for batch in 0..12 {
1323 for i in 0..5 {
1324 let mut doc = Document::new();
1325 doc.add_text(title, format!("doc_{}_batch_{}", i, batch));
1326 let vec: Vec<f32> = (0..8).map(|j| (i * 8 + j + batch) as f32 * 0.1).collect();
1328 doc.add_dense_vector(embedding, vec);
1329 writer.add_document(doc).unwrap();
1330 }
1331 writer.commit().await.unwrap();
1332 }
1333 writer.wait_for_merging_thread().await;
1334
1335 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1336 let num_docs = index.num_docs().await.unwrap();
1337 assert_eq!(num_docs, 60, "Expected 60 docs, got {}", num_docs);
1338 }
1339
1340 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1343 async fn test_force_merge_many_segments() {
1344 use crate::directories::MmapDirectory;
1345 let tmp_dir = tempfile::tempdir().unwrap();
1346 let dir = MmapDirectory::new(tmp_dir.path());
1347
1348 let mut schema_builder = SchemaBuilder::default();
1349 let title = schema_builder.add_text_field("title", true, true);
1350 let schema = schema_builder.build();
1351
1352 let config = IndexConfig {
1353 max_indexing_memory_bytes: 512,
1354 ..Default::default()
1355 };
1356
1357 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1358 .await
1359 .unwrap();
1360
1361 for batch in 0..50 {
1363 for i in 0..3 {
1364 let mut doc = Document::new();
1365 doc.add_text(title, format!("term_{} batch_{}", i, batch));
1366 writer.add_document(doc).unwrap();
1367 }
1368 writer.commit().await.unwrap();
1369 }
1370 writer.wait_for_merging_thread().await;
1372
1373 let seg_ids = writer.segment_manager.get_segment_ids().await;
1374 let pre = seg_ids.len();
1375 eprintln!("Segments before force_merge: {}", pre);
1376 assert!(pre >= 2, "Expected multiple segments, got {}", pre);
1377
1378 writer.force_merge().await.unwrap();
1380
1381 let index2 = Index::open(dir, config).await.unwrap();
1382 let post = index2.segment_readers().await.unwrap().len();
1383 eprintln!("Segments after force_merge: {}", post);
1384 assert_eq!(post, 1);
1385 assert_eq!(index2.num_docs().await.unwrap(), 150);
1386 }
1387
1388 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1392 async fn test_background_merge_generation() {
1393 use crate::directories::MmapDirectory;
1394 let tmp_dir = tempfile::tempdir().unwrap();
1395 let dir = MmapDirectory::new(tmp_dir.path());
1396
1397 let mut schema_builder = SchemaBuilder::default();
1398 let title = schema_builder.add_text_field("title", true, true);
1399 let schema = schema_builder.build();
1400
1401 let config = IndexConfig {
1402 max_indexing_memory_bytes: 4096,
1403 num_indexing_threads: 2,
1404 merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1405 ..Default::default()
1406 };
1407
1408 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1409 .await
1410 .unwrap();
1411
1412 for batch in 0..15 {
1414 for i in 0..5 {
1415 let mut doc = Document::new();
1416 doc.add_text(title, format!("doc_{}_batch_{}", i, batch));
1417 writer.add_document(doc).unwrap();
1418 }
1419 writer.commit().await.unwrap();
1420 }
1421 writer.wait_for_merging_thread().await;
1422
1423 let metas = writer
1425 .segment_manager
1426 .read_metadata(|m| m.segment_metas.clone())
1427 .await;
1428
1429 let max_gen = metas.values().map(|m| m.generation).max().unwrap_or(0);
1430 eprintln!(
1431 "Segments after merge: {}, max generation: {}",
1432 metas.len(),
1433 max_gen
1434 );
1435
1436 assert!(
1438 max_gen >= 1,
1439 "Expected at least one merged segment (gen >= 1), got max_gen={}",
1440 max_gen
1441 );
1442
1443 for (id, info) in &metas {
1445 if info.generation > 0 {
1446 assert!(
1447 !info.ancestors.is_empty(),
1448 "Segment {} has gen={} but no ancestors",
1449 id,
1450 info.generation
1451 );
1452 } else {
1453 assert!(
1454 info.ancestors.is_empty(),
1455 "Fresh segment {} has gen=0 but has ancestors",
1456 id
1457 );
1458 }
1459 }
1460 }
1461
1462 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1466 async fn test_merge_preserves_all_documents() {
1467 use crate::directories::MmapDirectory;
1468 let tmp_dir = tempfile::tempdir().unwrap();
1469 let dir = MmapDirectory::new(tmp_dir.path());
1470
1471 let mut schema_builder = SchemaBuilder::default();
1472 let title = schema_builder.add_text_field("title", true, true);
1473 let schema = schema_builder.build();
1474
1475 let config = IndexConfig {
1476 max_indexing_memory_bytes: 4096,
1477 ..Default::default()
1478 };
1479
1480 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1481 .await
1482 .unwrap();
1483
1484 let total_docs = 1200;
1485 let docs_per_batch = 60;
1486 let batches = total_docs / docs_per_batch;
1487
1488 for batch in 0..batches {
1490 for i in 0..docs_per_batch {
1491 let doc_num = batch * docs_per_batch + i;
1492 let mut doc = Document::new();
1493 doc.add_text(
1494 title,
1495 format!("uid_{} common_term batch_{}", doc_num, batch),
1496 );
1497 writer.add_document(doc).unwrap();
1498 }
1499 writer.commit().await.unwrap();
1500 }
1501
1502 let pre_segments = writer.segment_manager.get_segment_ids().await.len();
1503 assert!(
1504 pre_segments >= 2,
1505 "Need multiple segments, got {}",
1506 pre_segments
1507 );
1508
1509 writer.force_merge().await.unwrap();
1511
1512 let index = Index::open(dir, config).await.unwrap();
1513 assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1514 assert_eq!(
1515 index.num_docs().await.unwrap(),
1516 total_docs as u32,
1517 "Doc count mismatch after force_merge"
1518 );
1519
1520 let results = index.query("common_term", total_docs + 100).await.unwrap();
1522 assert_eq!(
1523 results.hits.len(),
1524 total_docs,
1525 "common_term should match all docs"
1526 );
1527
1528 for check in [0, 1, total_docs / 2, total_docs - 1] {
1530 let q = format!("uid_{}", check);
1531 let results = index.query(&q, 10).await.unwrap();
1532 assert_eq!(results.hits.len(), 1, "'{}' should match exactly 1 doc", q);
1533 }
1534 }
1535
1536 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1539 async fn test_multi_round_merge_doc_integrity() {
1540 use crate::directories::MmapDirectory;
1541 let tmp_dir = tempfile::tempdir().unwrap();
1542 let dir = MmapDirectory::new(tmp_dir.path());
1543
1544 let mut schema_builder = SchemaBuilder::default();
1545 let title = schema_builder.add_text_field("title", true, true);
1546 let schema = schema_builder.build();
1547
1548 let config = IndexConfig {
1549 max_indexing_memory_bytes: 4096,
1550 num_indexing_threads: 2,
1551 merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1552 ..Default::default()
1553 };
1554
1555 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1556 .await
1557 .unwrap();
1558
1559 let mut expected_total = 0u64;
1560
1561 for round in 0..4 {
1563 let docs_this_round = 50 + round * 25; for batch in 0..5 {
1565 for i in 0..docs_this_round / 5 {
1566 let mut doc = Document::new();
1567 doc.add_text(
1568 title,
1569 format!("round_{}_batch_{}_doc_{} searchable", round, batch, i),
1570 );
1571 writer.add_document(doc).unwrap();
1572 }
1573 writer.commit().await.unwrap();
1574 }
1575 writer.wait_for_merging_thread().await;
1576
1577 expected_total += docs_this_round as u64;
1578
1579 let actual = writer
1580 .segment_manager
1581 .read_metadata(|m| {
1582 m.segment_metas
1583 .values()
1584 .map(|s| s.num_docs as u64)
1585 .sum::<u64>()
1586 })
1587 .await;
1588
1589 assert_eq!(
1590 actual, expected_total,
1591 "Round {}: expected {} docs, metadata reports {}",
1592 round, expected_total, actual
1593 );
1594 }
1595
1596 let index = Index::open(dir, config).await.unwrap();
1598 assert_eq!(index.num_docs().await.unwrap(), expected_total as u32);
1599
1600 let results = index
1601 .query("searchable", expected_total as usize + 100)
1602 .await
1603 .unwrap();
1604 assert_eq!(
1605 results.hits.len(),
1606 expected_total as usize,
1607 "All docs should match 'searchable'"
1608 );
1609
1610 let metas = index
1612 .segment_manager()
1613 .read_metadata(|m| m.segment_metas.clone())
1614 .await;
1615 let max_gen = metas.values().map(|m| m.generation).max().unwrap_or(0);
1616 eprintln!(
1617 "Final: {} segments, {} docs, max generation={}",
1618 metas.len(),
1619 expected_total,
1620 max_gen
1621 );
1622 assert!(
1623 max_gen >= 1,
1624 "Multiple merge rounds should produce gen >= 1"
1625 );
1626 }
1627
1628 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1634 async fn test_segment_count_bounded_during_sustained_indexing() {
1635 use crate::directories::MmapDirectory;
1636 let tmp_dir = tempfile::tempdir().unwrap();
1637 let dir = MmapDirectory::new(tmp_dir.path());
1638
1639 let mut schema_builder = SchemaBuilder::default();
1640 let title = schema_builder.add_text_field("title", true, false);
1641 let schema = schema_builder.build();
1642
1643 let policy = crate::merge::TieredMergePolicy {
1644 segments_per_tier: 3,
1645 max_merge_at_once: 5,
1646 tier_factor: 10.0,
1647 tier_floor: 50,
1648 max_merged_docs: 1_000_000,
1649 };
1650
1651 let config = IndexConfig {
1652 max_indexing_memory_bytes: 4096, num_indexing_threads: 1,
1654 merge_policy: Box::new(policy),
1655 max_concurrent_merges: 4,
1656 ..Default::default()
1657 };
1658
1659 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1660 .await
1661 .unwrap();
1662
1663 let num_commits = 40;
1664 let docs_per_commit = 30;
1665 let total_docs = num_commits * docs_per_commit;
1666 let mut max_segments_seen = 0usize;
1667
1668 for commit_idx in 0..num_commits {
1669 for i in 0..docs_per_commit {
1670 let mut doc = Document::new();
1671 doc.add_text(
1672 title,
1673 format!("doc_{} text", commit_idx * docs_per_commit + i),
1674 );
1675 writer.add_document(doc).unwrap();
1676 }
1677 writer.commit().await.unwrap();
1678
1679 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1681
1682 let seg_count = writer.segment_manager.get_segment_ids().await.len();
1683 max_segments_seen = max_segments_seen.max(seg_count);
1684 }
1685
1686 writer.wait_for_all_merges().await;
1688
1689 let final_segments = writer.segment_manager.get_segment_ids().await.len();
1690 let final_docs: u64 = writer
1691 .segment_manager
1692 .read_metadata(|m| {
1693 m.segment_metas
1694 .values()
1695 .map(|s| s.num_docs as u64)
1696 .sum::<u64>()
1697 })
1698 .await;
1699
1700 eprintln!(
1701 "Sustained indexing: {} commits, {} total docs, final segments={}, max segments seen={}",
1702 num_commits, total_docs, final_segments, max_segments_seen
1703 );
1704
1705 let max_allowed = num_commits / 2; assert!(
1712 max_segments_seen <= max_allowed,
1713 "Segment count grew too fast: max seen {} > allowed {} (out of {} commits). \
1714 Merging is not keeping up.",
1715 max_segments_seen,
1716 max_allowed,
1717 num_commits
1718 );
1719
1720 assert!(
1722 final_segments <= 10,
1723 "After all merges, expected ≤10 segments, got {}",
1724 final_segments
1725 );
1726
1727 assert_eq!(
1729 final_docs, total_docs as u64,
1730 "Expected {} docs, metadata reports {}",
1731 total_docs, final_docs
1732 );
1733 }
1734
1735 #[tokio::test]
1742 async fn test_needle_fulltext_single_segment() {
1743 let mut sb = SchemaBuilder::default();
1744 let title = sb.add_text_field("title", true, true);
1745 let body = sb.add_text_field("body", true, true);
1746 let schema = sb.build();
1747
1748 let dir = RamDirectory::new();
1749 let config = IndexConfig::default();
1750 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1751 .await
1752 .unwrap();
1753
1754 for i in 0..100 {
1756 let mut doc = Document::new();
1757 doc.add_text(title, format!("Hay document number {}", i));
1758 doc.add_text(
1759 body,
1760 "common words repeated across all hay documents filler text",
1761 );
1762 writer.add_document(doc).unwrap();
1763 }
1764
1765 let mut needle = Document::new();
1767 needle.add_text(title, "The unique needle xylophone");
1768 needle.add_text(
1769 body,
1770 "This document contains the extraordinary term xylophone",
1771 );
1772 writer.add_document(needle).unwrap();
1775
1776 for i in 100..150 {
1778 let mut doc = Document::new();
1779 doc.add_text(title, format!("More hay document {}", i));
1780 doc.add_text(body, "common words filler text again and again");
1781 writer.add_document(doc).unwrap();
1782 }
1783
1784 writer.commit().await.unwrap();
1785
1786 let index = Index::open(dir, config).await.unwrap();
1787 assert_eq!(index.num_docs().await.unwrap(), 151);
1788
1789 let results = index.query("xylophone", 10).await.unwrap();
1791 assert_eq!(results.hits.len(), 1, "Should find exactly the needle");
1792 assert!(results.hits[0].score > 0.0, "Score should be positive");
1793
1794 let doc = index
1796 .get_document(&results.hits[0].address)
1797 .await
1798 .unwrap()
1799 .unwrap();
1800 let title_val = doc.get_first(title).unwrap().as_text().unwrap();
1801 assert!(
1802 title_val.contains("xylophone"),
1803 "Retrieved doc should be the needle"
1804 );
1805
1806 let results = index.query("common", 200).await.unwrap();
1808 assert!(
1809 results.hits.len() >= 100,
1810 "Common term should match many docs"
1811 );
1812
1813 let results = index.query("nonexistentterm99999", 10).await.unwrap();
1815 assert_eq!(
1816 results.hits.len(),
1817 0,
1818 "Non-existent term should match nothing"
1819 );
1820 }
1821
1822 #[tokio::test]
1824 async fn test_needle_fulltext_multi_segment() {
1825 use crate::query::TermQuery;
1826
1827 let mut sb = SchemaBuilder::default();
1828 let content = sb.add_text_field("content", true, true);
1829 let schema = sb.build();
1830
1831 let dir = RamDirectory::new();
1832 let config = IndexConfig::default();
1833 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1834 .await
1835 .unwrap();
1836
1837 for i in 0..50 {
1839 let mut doc = Document::new();
1840 doc.add_text(content, format!("segment one hay document {}", i));
1841 writer.add_document(doc).unwrap();
1842 }
1843 writer.commit().await.unwrap();
1844
1845 let mut needle = Document::new();
1847 needle.add_text(content, "the magnificent quetzalcoatl serpent deity");
1848 writer.add_document(needle).unwrap();
1849 for i in 0..49 {
1850 let mut doc = Document::new();
1851 doc.add_text(content, format!("segment two hay document {}", i));
1852 writer.add_document(doc).unwrap();
1853 }
1854 writer.commit().await.unwrap();
1855
1856 for i in 0..50 {
1858 let mut doc = Document::new();
1859 doc.add_text(content, format!("segment three hay document {}", i));
1860 writer.add_document(doc).unwrap();
1861 }
1862 writer.commit().await.unwrap();
1863
1864 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1865 assert_eq!(index.num_docs().await.unwrap(), 150);
1866 let num_segments = index.segment_readers().await.unwrap().len();
1867 assert!(
1868 num_segments >= 2,
1869 "Should have multiple segments, got {}",
1870 num_segments
1871 );
1872
1873 let results = index.query("quetzalcoatl", 10).await.unwrap();
1875 assert_eq!(
1876 results.hits.len(),
1877 1,
1878 "Should find exactly 1 needle across segments"
1879 );
1880
1881 let reader = index.reader().await.unwrap();
1883 let searcher = reader.searcher().await.unwrap();
1884 let tq = TermQuery::text(content, "quetzalcoatl");
1885 let results = searcher.search(&tq, 10).await.unwrap();
1886 assert_eq!(results.len(), 1, "TermQuery should also find the needle");
1887
1888 let doc = searcher.doc(results[0].doc_id).await.unwrap().unwrap();
1890 let text = doc.get_first(content).unwrap().as_text().unwrap();
1891 assert!(
1892 text.contains("quetzalcoatl"),
1893 "Should retrieve needle content"
1894 );
1895
1896 let results = index.query("document", 200).await.unwrap();
1898 assert!(
1899 results.hits.len() >= 149,
1900 "Should find hay docs across all segments"
1901 );
1902 }
1903
1904 #[tokio::test]
1906 async fn test_needle_sparse_vector() {
1907 use crate::query::SparseVectorQuery;
1908
1909 let mut sb = SchemaBuilder::default();
1910 let title = sb.add_text_field("title", true, true);
1911 let sparse = sb.add_sparse_vector_field("sparse", true, true);
1912 let schema = sb.build();
1913
1914 let dir = RamDirectory::new();
1915 let config = IndexConfig::default();
1916 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1917 .await
1918 .unwrap();
1919
1920 for i in 0..100 {
1922 let mut doc = Document::new();
1923 doc.add_text(title, format!("Hay sparse doc {}", i));
1924 let entries: Vec<(u32, f32)> = (0..10)
1926 .map(|d| (d, 0.1 + (i as f32 * 0.001) + (d as f32 * 0.01)))
1927 .collect();
1928 doc.add_sparse_vector(sparse, entries);
1929 writer.add_document(doc).unwrap();
1930 }
1931
1932 let mut needle = Document::new();
1934 needle.add_text(title, "Needle sparse document");
1935 needle.add_sparse_vector(
1936 sparse,
1937 vec![(1000, 0.9), (1001, 0.8), (1002, 0.7), (5, 0.3)],
1938 );
1939 writer.add_document(needle).unwrap();
1940
1941 for i in 100..150 {
1943 let mut doc = Document::new();
1944 doc.add_text(title, format!("More hay sparse doc {}", i));
1945 let entries: Vec<(u32, f32)> = (0..10).map(|d| (d, 0.2 + (d as f32 * 0.02))).collect();
1946 doc.add_sparse_vector(sparse, entries);
1947 writer.add_document(doc).unwrap();
1948 }
1949
1950 writer.commit().await.unwrap();
1951
1952 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1953 assert_eq!(index.num_docs().await.unwrap(), 151);
1954
1955 let reader = index.reader().await.unwrap();
1957 let searcher = reader.searcher().await.unwrap();
1958 let query = SparseVectorQuery::new(sparse, vec![(1000, 1.0), (1001, 1.0), (1002, 1.0)]);
1959 let results = searcher.search(&query, 10).await.unwrap();
1960 assert_eq!(results.len(), 1, "Only needle has dims 1000-1002");
1961 assert!(results[0].score > 0.0, "Needle score should be positive");
1962
1963 let doc = searcher.doc(results[0].doc_id).await.unwrap().unwrap();
1965 let title_val = doc.get_first(title).unwrap().as_text().unwrap();
1966 assert_eq!(title_val, "Needle sparse document");
1967
1968 let query_shared = SparseVectorQuery::new(sparse, vec![(5, 1.0)]);
1970 let results = searcher.search(&query_shared, 200).await.unwrap();
1971 assert!(
1972 results.len() >= 100,
1973 "Shared dim 5 should match many docs, got {}",
1974 results.len()
1975 );
1976
1977 let query_missing = SparseVectorQuery::new(sparse, vec![(99999, 1.0)]);
1979 let results = searcher.search(&query_missing, 10).await.unwrap();
1980 assert_eq!(
1981 results.len(),
1982 0,
1983 "Non-existent dimension should match nothing"
1984 );
1985 }
1986
1987 #[tokio::test]
1989 async fn test_needle_sparse_vector_multi_segment_merge() {
1990 use crate::query::SparseVectorQuery;
1991
1992 let mut sb = SchemaBuilder::default();
1993 let title = sb.add_text_field("title", true, true);
1994 let sparse = sb.add_sparse_vector_field("sparse", true, true);
1995 let schema = sb.build();
1996
1997 let dir = RamDirectory::new();
1998 let config = IndexConfig::default();
1999 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
2000 .await
2001 .unwrap();
2002
2003 for i in 0..30 {
2005 let mut doc = Document::new();
2006 doc.add_text(title, format!("seg1 hay {}", i));
2007 doc.add_sparse_vector(sparse, vec![(0, 0.5), (1, 0.3)]);
2008 writer.add_document(doc).unwrap();
2009 }
2010 writer.commit().await.unwrap();
2011
2012 let mut needle = Document::new();
2014 needle.add_text(title, "seg2 needle");
2015 needle.add_sparse_vector(sparse, vec![(500, 0.95), (501, 0.85)]);
2016 writer.add_document(needle).unwrap();
2017 for i in 0..29 {
2018 let mut doc = Document::new();
2019 doc.add_text(title, format!("seg2 hay {}", i));
2020 doc.add_sparse_vector(sparse, vec![(0, 0.4), (2, 0.6)]);
2021 writer.add_document(doc).unwrap();
2022 }
2023 writer.commit().await.unwrap();
2024
2025 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
2027 assert_eq!(index.num_docs().await.unwrap(), 60);
2028
2029 let reader = index.reader().await.unwrap();
2030 let searcher = reader.searcher().await.unwrap();
2031 let query = SparseVectorQuery::new(sparse, vec![(500, 1.0), (501, 1.0)]);
2032 let results = searcher.search(&query, 10).await.unwrap();
2033 assert_eq!(results.len(), 1, "Pre-merge: needle should be found");
2034 let doc = searcher.doc(results[0].doc_id).await.unwrap().unwrap();
2035 assert_eq!(
2036 doc.get_first(title).unwrap().as_text().unwrap(),
2037 "seg2 needle"
2038 );
2039
2040 let mut writer = IndexWriter::open(dir.clone(), config.clone())
2042 .await
2043 .unwrap();
2044 writer.force_merge().await.unwrap();
2045
2046 let index = Index::open(dir, config).await.unwrap();
2048 assert_eq!(index.segment_readers().await.unwrap().len(), 1);
2049 assert_eq!(index.num_docs().await.unwrap(), 60);
2050
2051 let reader = index.reader().await.unwrap();
2052 let searcher = reader.searcher().await.unwrap();
2053 let query = SparseVectorQuery::new(sparse, vec![(500, 1.0), (501, 1.0)]);
2054 let results = searcher.search(&query, 10).await.unwrap();
2055 assert_eq!(results.len(), 1, "Post-merge: needle should still be found");
2056 let doc = searcher.doc(results[0].doc_id).await.unwrap().unwrap();
2057 assert_eq!(
2058 doc.get_first(title).unwrap().as_text().unwrap(),
2059 "seg2 needle"
2060 );
2061 }
2062
2063 #[tokio::test]
2065 async fn test_needle_dense_vector_flat() {
2066 use crate::dsl::{DenseVectorConfig, VectorIndexType};
2067 use crate::query::DenseVectorQuery;
2068
2069 let dim = 16;
2070 let mut sb = SchemaBuilder::default();
2071 let title = sb.add_text_field("title", true, true);
2072 let embedding = sb.add_dense_vector_field_with_config(
2073 "embedding",
2074 true,
2075 true,
2076 DenseVectorConfig {
2077 dim,
2078 index_type: VectorIndexType::Flat,
2079 quantization: crate::dsl::DenseVectorQuantization::F32,
2080 num_clusters: None,
2081 nprobe: 0,
2082 build_threshold: None,
2083 unit_norm: false,
2084 },
2085 );
2086 let schema = sb.build();
2087
2088 let dir = RamDirectory::new();
2089 let config = IndexConfig::default();
2090 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
2091 .await
2092 .unwrap();
2093
2094 for i in 0..100 {
2096 let mut doc = Document::new();
2097 doc.add_text(title, format!("Hay dense doc {}", i));
2098 let vec: Vec<f32> = (0..dim)
2100 .map(|d| ((i * 7 + d * 13) % 100) as f32 / 1000.0)
2101 .collect();
2102 doc.add_dense_vector(embedding, vec);
2103 writer.add_document(doc).unwrap();
2104 }
2105
2106 let mut needle = Document::new();
2108 needle.add_text(title, "Needle dense document");
2109 let needle_vec: Vec<f32> = vec![1.0; dim];
2110 needle.add_dense_vector(embedding, needle_vec.clone());
2111 writer.add_document(needle).unwrap();
2112
2113 writer.commit().await.unwrap();
2114
2115 let index = Index::open(dir, config).await.unwrap();
2116 assert_eq!(index.num_docs().await.unwrap(), 101);
2117
2118 let reader = index.reader().await.unwrap();
2120 let searcher = reader.searcher().await.unwrap();
2121 let query = DenseVectorQuery::new(embedding, needle_vec);
2122 let results = searcher.search(&query, 5).await.unwrap();
2123 assert!(!results.is_empty(), "Should find at least 1 result");
2124
2125 let top_doc = searcher.doc(results[0].doc_id).await.unwrap().unwrap();
2127 let top_title = top_doc.get_first(title).unwrap().as_text().unwrap();
2128 assert_eq!(
2129 top_title, "Needle dense document",
2130 "Top result should be the needle (exact vector match)"
2131 );
2132 assert!(
2133 results[0].score > 0.9,
2134 "Exact match should have very high cosine similarity, got {}",
2135 results[0].score
2136 );
2137 }
2138
2139 #[tokio::test]
2142 async fn test_needle_combined_all_modalities() {
2143 use crate::dsl::{DenseVectorConfig, VectorIndexType};
2144 use crate::query::{DenseVectorQuery, SparseVectorQuery, TermQuery};
2145
2146 let dim = 8;
2147 let mut sb = SchemaBuilder::default();
2148 let title = sb.add_text_field("title", true, true);
2149 let body = sb.add_text_field("body", true, true);
2150 let sparse = sb.add_sparse_vector_field("sparse", true, true);
2151 let embedding = sb.add_dense_vector_field_with_config(
2152 "embedding",
2153 true,
2154 true,
2155 DenseVectorConfig {
2156 dim,
2157 index_type: VectorIndexType::Flat,
2158 quantization: crate::dsl::DenseVectorQuantization::F32,
2159 num_clusters: None,
2160 nprobe: 0,
2161 build_threshold: None,
2162 unit_norm: false,
2163 },
2164 );
2165 let schema = sb.build();
2166
2167 let dir = RamDirectory::new();
2168 let config = IndexConfig::default();
2169 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
2170 .await
2171 .unwrap();
2172
2173 for i in 0..80u32 {
2175 let mut doc = Document::new();
2176 doc.add_text(title, format!("Hay doc {}", i));
2177 doc.add_text(body, "general filler text about nothing special");
2178 doc.add_sparse_vector(sparse, vec![(0, 0.3), (1, 0.2), ((i % 10) + 10, 0.5)]);
2179 let vec: Vec<f32> = (0..dim)
2180 .map(|d| ((i as usize * 3 + d * 7) % 50) as f32 / 100.0)
2181 .collect();
2182 doc.add_dense_vector(embedding, vec);
2183 writer.add_document(doc).unwrap();
2184 }
2185
2186 let mut needle = Document::new();
2188 needle.add_text(title, "The extraordinary rhinoceros");
2189 needle.add_text(
2190 body,
2191 "This document about rhinoceros is the only one with this word",
2192 );
2193 needle.add_sparse_vector(sparse, vec![(9999, 0.99), (9998, 0.88)]);
2194 let needle_vec = vec![0.9; dim];
2195 needle.add_dense_vector(embedding, needle_vec.clone());
2196 writer.add_document(needle).unwrap();
2197
2198 writer.commit().await.unwrap();
2199
2200 let index = Index::open(dir, config).await.unwrap();
2201 assert_eq!(index.num_docs().await.unwrap(), 81);
2202
2203 let reader = index.reader().await.unwrap();
2204 let searcher = reader.searcher().await.unwrap();
2205
2206 let tq = TermQuery::text(body, "rhinoceros");
2208 let results = searcher.search(&tq, 10).await.unwrap();
2209 assert_eq!(
2210 results.len(),
2211 1,
2212 "Full-text: should find exactly the needle"
2213 );
2214 let doc = searcher.doc(results[0].doc_id).await.unwrap().unwrap();
2215 assert!(
2216 doc.get_first(title)
2217 .unwrap()
2218 .as_text()
2219 .unwrap()
2220 .contains("rhinoceros")
2221 );
2222
2223 let sq = SparseVectorQuery::new(sparse, vec![(9999, 1.0), (9998, 1.0)]);
2225 let results = searcher.search(&sq, 10).await.unwrap();
2226 assert_eq!(results.len(), 1, "Sparse: should find exactly the needle");
2227 let doc = searcher.doc(results[0].doc_id).await.unwrap().unwrap();
2228 assert!(
2229 doc.get_first(title)
2230 .unwrap()
2231 .as_text()
2232 .unwrap()
2233 .contains("rhinoceros")
2234 );
2235
2236 let dq = DenseVectorQuery::new(embedding, needle_vec);
2238 let results = searcher.search(&dq, 1).await.unwrap();
2239 assert!(!results.is_empty(), "Dense: should find at least 1 result");
2240 let doc = searcher.doc(results[0].doc_id).await.unwrap().unwrap();
2241 assert_eq!(
2242 doc.get_first(title).unwrap().as_text().unwrap(),
2243 "The extraordinary rhinoceros",
2244 "Dense: top-1 should be the needle"
2245 );
2246
2247 let ft_doc_id = {
2249 let tq = TermQuery::text(body, "rhinoceros");
2250 let r = searcher.search(&tq, 1).await.unwrap();
2251 r[0].doc_id
2252 };
2253 let sp_doc_id = {
2254 let sq = SparseVectorQuery::new(sparse, vec![(9999, 1.0)]);
2255 let r = searcher.search(&sq, 1).await.unwrap();
2256 r[0].doc_id
2257 };
2258 let dn_doc_id = {
2259 let dq = DenseVectorQuery::new(embedding, vec![0.9; dim]);
2260 let r = searcher.search(&dq, 1).await.unwrap();
2261 r[0].doc_id
2262 };
2263
2264 assert_eq!(
2265 ft_doc_id, sp_doc_id,
2266 "Full-text and sparse should find same doc"
2267 );
2268 assert_eq!(
2269 sp_doc_id, dn_doc_id,
2270 "Sparse and dense should find same doc"
2271 );
2272 }
2273
2274 #[tokio::test]
2276 async fn test_many_needles_all_found() {
2277 let mut sb = SchemaBuilder::default();
2278 let content = sb.add_text_field("content", true, true);
2279 let schema = sb.build();
2280
2281 let dir = RamDirectory::new();
2282 let config = IndexConfig::default();
2283 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
2284 .await
2285 .unwrap();
2286
2287 let num_needles = 20usize;
2288 let hay_per_batch = 50usize;
2289 let needle_terms: Vec<String> = (0..num_needles)
2290 .map(|i| format!("uniqueneedle{:04}", i))
2291 .collect();
2292
2293 for batch in 0..4 {
2295 for i in 0..hay_per_batch {
2297 let mut doc = Document::new();
2298 doc.add_text(
2299 content,
2300 format!("hay batch {} item {} common filler", batch, i),
2301 );
2302 writer.add_document(doc).unwrap();
2303 }
2304 for n in 0..5 {
2306 let needle_idx = batch * 5 + n;
2307 let mut doc = Document::new();
2308 doc.add_text(
2309 content,
2310 format!("this is {} among many documents", needle_terms[needle_idx]),
2311 );
2312 writer.add_document(doc).unwrap();
2313 }
2314 writer.commit().await.unwrap();
2315 }
2316
2317 let index = Index::open(dir, config).await.unwrap();
2318 let total = index.num_docs().await.unwrap();
2319 assert_eq!(total, (hay_per_batch * 4 + num_needles) as u32);
2320
2321 for term in &needle_terms {
2323 let results = index.query(term, 10).await.unwrap();
2324 assert_eq!(
2325 results.hits.len(),
2326 1,
2327 "Should find exactly 1 doc for needle '{}'",
2328 term
2329 );
2330 }
2331
2332 let results = index.query("common", 500).await.unwrap();
2334 assert_eq!(
2335 results.hits.len(),
2336 hay_per_batch * 4,
2337 "Common term should match all {} hay docs",
2338 hay_per_batch * 4
2339 );
2340 }
2341}