1#[cfg(feature = "native")]
11use crate::dsl::Schema;
12#[cfg(feature = "native")]
13use crate::error::Result;
14#[cfg(feature = "native")]
15use std::sync::Arc;
16
17mod searcher;
18pub use searcher::Searcher;
19
20#[cfg(feature = "native")]
21mod reader;
22#[cfg(feature = "native")]
23mod vector_builder;
24#[cfg(feature = "native")]
25mod writer;
26#[cfg(feature = "native")]
27pub use reader::IndexReader;
28#[cfg(feature = "native")]
29pub use writer::{IndexWriter, PreparedCommit};
30
31mod metadata;
32pub use metadata::{FieldVectorMeta, INDEX_META_FILENAME, IndexMetadata, VectorIndexState};
33
34#[cfg(feature = "native")]
35mod helpers;
36#[cfg(feature = "native")]
37pub use helpers::{
38 IndexingStats, SchemaConfig, SchemaFieldConfig, create_index_at_path, create_index_from_sdl,
39 index_documents_from_reader, index_json_document, parse_schema,
40};
41
42pub const SLICE_CACHE_FILENAME: &str = "index.slicecache";
44
45#[derive(Debug, Clone)]
47pub struct IndexConfig {
48 pub num_threads: usize,
50 pub num_indexing_threads: usize,
52 pub num_compression_threads: usize,
54 pub term_cache_blocks: usize,
56 pub store_cache_blocks: usize,
58 pub max_indexing_memory_bytes: usize,
60 pub merge_policy: Box<dyn crate::merge::MergePolicy>,
62 pub optimization: crate::structures::IndexOptimization,
64 pub reload_interval_ms: u64,
66 pub max_concurrent_merges: usize,
68}
69
70impl Default for IndexConfig {
71 fn default() -> Self {
72 #[cfg(feature = "native")]
73 let indexing_threads = crate::default_indexing_threads();
74 #[cfg(not(feature = "native"))]
75 let indexing_threads = 1;
76
77 #[cfg(feature = "native")]
78 let compression_threads = crate::default_compression_threads();
79 #[cfg(not(feature = "native"))]
80 let compression_threads = 1;
81
82 Self {
83 num_threads: indexing_threads,
84 num_indexing_threads: 1, num_compression_threads: compression_threads,
86 term_cache_blocks: 256,
87 store_cache_blocks: 32,
88 max_indexing_memory_bytes: 256 * 1024 * 1024, merge_policy: Box::new(crate::merge::TieredMergePolicy::default()),
90 optimization: crate::structures::IndexOptimization::default(),
91 reload_interval_ms: 1000, max_concurrent_merges: 4,
93 }
94 }
95}
96
97#[cfg(feature = "native")]
106pub struct Index<D: crate::directories::DirectoryWriter + 'static> {
107 directory: Arc<D>,
108 schema: Arc<Schema>,
109 config: IndexConfig,
110 segment_manager: Arc<crate::merge::SegmentManager<D>>,
112 cached_reader: tokio::sync::OnceCell<IndexReader<D>>,
114}
115
116#[cfg(feature = "native")]
117impl<D: crate::directories::DirectoryWriter + 'static> Index<D> {
118 pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
120 let directory = Arc::new(directory);
121 let schema = Arc::new(schema);
122 let metadata = IndexMetadata::new((*schema).clone());
123
124 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
125 Arc::clone(&directory),
126 Arc::clone(&schema),
127 metadata,
128 config.merge_policy.clone_box(),
129 config.term_cache_blocks,
130 config.max_concurrent_merges,
131 ));
132
133 segment_manager.update_metadata(|_| {}).await?;
135
136 Ok(Self {
137 directory,
138 schema,
139 config,
140 segment_manager,
141 cached_reader: tokio::sync::OnceCell::new(),
142 })
143 }
144
145 pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
147 let directory = Arc::new(directory);
148
149 let metadata = IndexMetadata::load(directory.as_ref()).await?;
151 let schema = Arc::new(metadata.schema.clone());
152
153 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
154 Arc::clone(&directory),
155 Arc::clone(&schema),
156 metadata,
157 config.merge_policy.clone_box(),
158 config.term_cache_blocks,
159 config.max_concurrent_merges,
160 ));
161
162 segment_manager.load_and_publish_trained().await;
164
165 Ok(Self {
166 directory,
167 schema,
168 config,
169 segment_manager,
170 cached_reader: tokio::sync::OnceCell::new(),
171 })
172 }
173
174 pub fn schema(&self) -> &Schema {
176 &self.schema
177 }
178
179 pub fn schema_arc(&self) -> &Arc<Schema> {
181 &self.schema
182 }
183
184 pub fn directory(&self) -> &D {
186 &self.directory
187 }
188
189 pub fn segment_manager(&self) -> &Arc<crate::merge::SegmentManager<D>> {
191 &self.segment_manager
192 }
193
194 pub async fn reader(&self) -> Result<&IndexReader<D>> {
199 self.cached_reader
200 .get_or_try_init(|| async {
201 IndexReader::from_segment_manager(
202 Arc::clone(&self.schema),
203 Arc::clone(&self.segment_manager),
204 self.config.term_cache_blocks,
205 self.config.reload_interval_ms,
206 )
207 .await
208 })
209 .await
210 }
211
212 pub fn config(&self) -> &IndexConfig {
214 &self.config
215 }
216
217 pub async fn segment_readers(&self) -> Result<Vec<Arc<crate::segment::SegmentReader>>> {
219 let reader = self.reader().await?;
220 let searcher = reader.searcher().await?;
221 Ok(searcher.segment_readers().to_vec())
222 }
223
224 pub async fn num_docs(&self) -> Result<u32> {
226 let reader = self.reader().await?;
227 let searcher = reader.searcher().await?;
228 Ok(searcher.num_docs())
229 }
230
231 pub fn default_fields(&self) -> Vec<crate::Field> {
233 if !self.schema.default_fields().is_empty() {
234 self.schema.default_fields().to_vec()
235 } else {
236 self.schema
237 .fields()
238 .filter(|(_, entry)| {
239 entry.indexed && entry.field_type == crate::dsl::FieldType::Text
240 })
241 .map(|(field, _)| field)
242 .collect()
243 }
244 }
245
246 pub fn tokenizers(&self) -> Arc<crate::tokenizer::TokenizerRegistry> {
248 Arc::new(crate::tokenizer::TokenizerRegistry::default())
249 }
250
251 pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
253 let default_fields = self.default_fields();
254 let tokenizers = self.tokenizers();
255
256 let query_routers = self.schema.query_routers();
257 if !query_routers.is_empty()
258 && let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers)
259 {
260 return crate::dsl::QueryLanguageParser::with_router(
261 Arc::clone(&self.schema),
262 default_fields,
263 tokenizers,
264 router,
265 );
266 }
267
268 crate::dsl::QueryLanguageParser::new(Arc::clone(&self.schema), default_fields, tokenizers)
269 }
270
271 pub async fn query(
273 &self,
274 query_str: &str,
275 limit: usize,
276 ) -> Result<crate::query::SearchResponse> {
277 self.query_offset(query_str, limit, 0).await
278 }
279
280 pub async fn query_offset(
282 &self,
283 query_str: &str,
284 limit: usize,
285 offset: usize,
286 ) -> Result<crate::query::SearchResponse> {
287 let parser = self.query_parser();
288 let query = parser
289 .parse(query_str)
290 .map_err(crate::error::Error::Query)?;
291 self.search_offset(query.as_ref(), limit, offset).await
292 }
293
294 pub async fn search(
296 &self,
297 query: &dyn crate::query::Query,
298 limit: usize,
299 ) -> Result<crate::query::SearchResponse> {
300 self.search_offset(query, limit, 0).await
301 }
302
303 pub async fn search_offset(
305 &self,
306 query: &dyn crate::query::Query,
307 limit: usize,
308 offset: usize,
309 ) -> Result<crate::query::SearchResponse> {
310 let reader = self.reader().await?;
311 let searcher = reader.searcher().await?;
312 let segments = searcher.segment_readers();
313
314 let fetch_limit = offset + limit;
315
316 let futures: Vec<_> = segments
317 .iter()
318 .map(|segment| {
319 let sid = segment.meta().id;
320 async move {
321 let results =
322 crate::query::search_segment(segment.as_ref(), query, fetch_limit).await?;
323 Ok::<_, crate::error::Error>(
324 results
325 .into_iter()
326 .map(move |r| (sid, r))
327 .collect::<Vec<_>>(),
328 )
329 }
330 })
331 .collect();
332
333 let batches = futures::future::try_join_all(futures).await?;
334 let mut all_results: Vec<(u128, crate::query::SearchResult)> =
335 Vec::with_capacity(batches.iter().map(|b| b.len()).sum());
336 for batch in batches {
337 all_results.extend(batch);
338 }
339
340 all_results.sort_by(|a, b| {
341 b.1.score
342 .partial_cmp(&a.1.score)
343 .unwrap_or(std::cmp::Ordering::Equal)
344 });
345
346 let total_hits = all_results.len() as u32;
347
348 let hits: Vec<crate::query::SearchHit> = all_results
349 .into_iter()
350 .skip(offset)
351 .take(limit)
352 .map(|(segment_id, result)| crate::query::SearchHit {
353 address: crate::query::DocAddress::new(segment_id, result.doc_id),
354 score: result.score,
355 matched_fields: result.extract_ordinals(),
356 })
357 .collect();
358
359 Ok(crate::query::SearchResponse { hits, total_hits })
360 }
361
362 pub async fn get_document(
364 &self,
365 address: &crate::query::DocAddress,
366 ) -> Result<Option<crate::dsl::Document>> {
367 let reader = self.reader().await?;
368 let searcher = reader.searcher().await?;
369 searcher.get_document(address).await
370 }
371
372 pub async fn get_postings(
374 &self,
375 field: crate::Field,
376 term: &[u8],
377 ) -> Result<
378 Vec<(
379 Arc<crate::segment::SegmentReader>,
380 crate::structures::BlockPostingList,
381 )>,
382 > {
383 let segments = self.segment_readers().await?;
384 let mut results = Vec::new();
385
386 for segment in segments {
387 if let Some(postings) = segment.get_postings(field, term).await? {
388 results.push((segment, postings));
389 }
390 }
391
392 Ok(results)
393 }
394}
395
396#[cfg(feature = "native")]
398impl<D: crate::directories::DirectoryWriter + 'static> Index<D> {
399 pub fn writer(&self) -> writer::IndexWriter<D> {
401 writer::IndexWriter::from_index(self)
402 }
403}
404
405#[cfg(test)]
406mod tests {
407 use super::*;
408 use crate::directories::RamDirectory;
409 use crate::dsl::{Document, SchemaBuilder};
410
411 #[tokio::test]
412 async fn test_index_create_and_search() {
413 let mut schema_builder = SchemaBuilder::default();
414 let title = schema_builder.add_text_field("title", true, true);
415 let body = schema_builder.add_text_field("body", true, true);
416 let schema = schema_builder.build();
417
418 let dir = RamDirectory::new();
419 let config = IndexConfig::default();
420
421 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
423 .await
424 .unwrap();
425
426 let mut doc1 = Document::new();
427 doc1.add_text(title, "Hello World");
428 doc1.add_text(body, "This is the first document");
429 writer.add_document(doc1).unwrap();
430
431 let mut doc2 = Document::new();
432 doc2.add_text(title, "Goodbye World");
433 doc2.add_text(body, "This is the second document");
434 writer.add_document(doc2).unwrap();
435
436 writer.commit().await.unwrap();
437
438 let index = Index::open(dir, config).await.unwrap();
440 assert_eq!(index.num_docs().await.unwrap(), 2);
441
442 let postings = index.get_postings(title, b"world").await.unwrap();
444 assert_eq!(postings.len(), 1); assert_eq!(postings[0].1.doc_count(), 2); let reader = index.reader().await.unwrap();
449 let searcher = reader.searcher().await.unwrap();
450 let seg_id = searcher.segment_readers()[0].meta().id;
451 let doc = searcher.doc(seg_id, 0).await.unwrap().unwrap();
452 assert_eq!(doc.get_first(title).unwrap().as_text(), Some("Hello World"));
453 }
454
455 #[tokio::test]
456 async fn test_multiple_segments() {
457 let mut schema_builder = SchemaBuilder::default();
458 let title = schema_builder.add_text_field("title", true, true);
459 let schema = schema_builder.build();
460
461 let dir = RamDirectory::new();
462 let config = IndexConfig {
463 max_indexing_memory_bytes: 1024, ..Default::default()
465 };
466
467 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
468 .await
469 .unwrap();
470
471 for batch in 0..3 {
473 for i in 0..5 {
474 let mut doc = Document::new();
475 doc.add_text(title, format!("Document {} batch {}", i, batch));
476 writer.add_document(doc).unwrap();
477 }
478 writer.commit().await.unwrap();
479 }
480
481 let index = Index::open(dir, config).await.unwrap();
483 assert_eq!(index.num_docs().await.unwrap(), 15);
484 assert!(
486 index.segment_readers().await.unwrap().len() >= 2,
487 "Expected multiple segments"
488 );
489 }
490
491 #[tokio::test]
492 async fn test_segment_merge() {
493 let mut schema_builder = SchemaBuilder::default();
494 let title = schema_builder.add_text_field("title", true, true);
495 let schema = schema_builder.build();
496
497 let dir = RamDirectory::new();
498 let config = IndexConfig {
499 max_indexing_memory_bytes: 512, ..Default::default()
501 };
502
503 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
504 .await
505 .unwrap();
506
507 for batch in 0..3 {
509 for i in 0..3 {
510 let mut doc = Document::new();
511 doc.add_text(title, format!("Document {} batch {}", i, batch));
512 writer.add_document(doc).unwrap();
513 }
514 writer.commit().await.unwrap();
515 }
516
517 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
519 assert!(
520 index.segment_readers().await.unwrap().len() >= 2,
521 "Expected multiple segments"
522 );
523
524 let mut writer = IndexWriter::open(dir.clone(), config.clone())
526 .await
527 .unwrap();
528 writer.force_merge().await.unwrap();
529
530 let index = Index::open(dir, config).await.unwrap();
532 assert_eq!(index.segment_readers().await.unwrap().len(), 1);
533 assert_eq!(index.num_docs().await.unwrap(), 9);
534
535 let reader = index.reader().await.unwrap();
537 let searcher = reader.searcher().await.unwrap();
538 let seg_id = searcher.segment_readers()[0].meta().id;
539 let mut found_docs = 0;
540 for i in 0..9 {
541 if searcher.doc(seg_id, i).await.unwrap().is_some() {
542 found_docs += 1;
543 }
544 }
545 assert_eq!(found_docs, 9);
546 }
547
548 #[tokio::test]
549 async fn test_match_query() {
550 let mut schema_builder = SchemaBuilder::default();
551 let title = schema_builder.add_text_field("title", true, true);
552 let body = schema_builder.add_text_field("body", true, true);
553 let schema = schema_builder.build();
554
555 let dir = RamDirectory::new();
556 let config = IndexConfig::default();
557
558 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
559 .await
560 .unwrap();
561
562 let mut doc1 = Document::new();
563 doc1.add_text(title, "rust programming");
564 doc1.add_text(body, "Learn rust language");
565 writer.add_document(doc1).unwrap();
566
567 let mut doc2 = Document::new();
568 doc2.add_text(title, "python programming");
569 doc2.add_text(body, "Learn python language");
570 writer.add_document(doc2).unwrap();
571
572 writer.commit().await.unwrap();
573
574 let index = Index::open(dir, config).await.unwrap();
575
576 let results = index.query("rust", 10).await.unwrap();
578 assert_eq!(results.hits.len(), 1);
579
580 let results = index.query("rust programming", 10).await.unwrap();
582 assert!(!results.hits.is_empty());
583
584 let hit = &results.hits[0];
586 assert!(!hit.address.segment_id.is_empty(), "Should have segment_id");
587
588 let doc = index.get_document(&hit.address).await.unwrap().unwrap();
590 assert!(
591 !doc.field_values().is_empty(),
592 "Doc should have field values"
593 );
594
595 let reader = index.reader().await.unwrap();
597 let searcher = reader.searcher().await.unwrap();
598 let seg_id = searcher.segment_readers()[0].meta().id;
599 let doc = searcher.doc(seg_id, 0).await.unwrap().unwrap();
600 assert!(
601 !doc.field_values().is_empty(),
602 "Doc should have field values"
603 );
604 }
605
606 #[tokio::test]
607 async fn test_slice_cache_warmup_and_load() {
608 use crate::directories::SliceCachingDirectory;
609
610 let mut schema_builder = SchemaBuilder::default();
611 let title = schema_builder.add_text_field("title", true, true);
612 let body = schema_builder.add_text_field("body", true, true);
613 let schema = schema_builder.build();
614
615 let dir = RamDirectory::new();
616 let config = IndexConfig::default();
617
618 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
620 .await
621 .unwrap();
622
623 for i in 0..10 {
624 let mut doc = Document::new();
625 doc.add_text(title, format!("Document {} about rust", i));
626 doc.add_text(body, format!("This is body text number {}", i));
627 writer.add_document(doc).unwrap();
628 }
629 writer.commit().await.unwrap();
630
631 let caching_dir = SliceCachingDirectory::new(dir.clone(), 1024 * 1024);
633 let index = Index::open(caching_dir, config.clone()).await.unwrap();
634
635 let results = index.query("rust", 10).await.unwrap();
637 assert!(!results.hits.is_empty());
638
639 let stats = index.directory.stats();
641 assert!(stats.total_bytes > 0, "Cache should have data after search");
642 }
643
644 #[tokio::test]
645 async fn test_multivalue_field_indexing_and_search() {
646 let mut schema_builder = SchemaBuilder::default();
647 let uris = schema_builder.add_text_field("uris", true, true);
648 let title = schema_builder.add_text_field("title", true, true);
649 let schema = schema_builder.build();
650
651 let dir = RamDirectory::new();
652 let config = IndexConfig::default();
653
654 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
656 .await
657 .unwrap();
658
659 let mut doc = Document::new();
660 doc.add_text(uris, "one");
661 doc.add_text(uris, "two");
662 doc.add_text(title, "Test Document");
663 writer.add_document(doc).unwrap();
664
665 let mut doc2 = Document::new();
667 doc2.add_text(uris, "three");
668 doc2.add_text(title, "Another Document");
669 writer.add_document(doc2).unwrap();
670
671 writer.commit().await.unwrap();
672
673 let index = Index::open(dir, config).await.unwrap();
675 assert_eq!(index.num_docs().await.unwrap(), 2);
676
677 let reader = index.reader().await.unwrap();
679 let searcher = reader.searcher().await.unwrap();
680 let seg_id = searcher.segment_readers()[0].meta().id;
681 let doc = searcher.doc(seg_id, 0).await.unwrap().unwrap();
682 let all_uris: Vec<_> = doc.get_all(uris).collect();
683 assert_eq!(all_uris.len(), 2, "Should have 2 uris values");
684 assert_eq!(all_uris[0].as_text(), Some("one"));
685 assert_eq!(all_uris[1].as_text(), Some("two"));
686
687 let json = doc.to_json(index.schema());
689 let uris_json = json.get("uris").unwrap();
690 assert!(uris_json.is_array(), "Multi-value field should be an array");
691 let uris_arr = uris_json.as_array().unwrap();
692 assert_eq!(uris_arr.len(), 2);
693 assert_eq!(uris_arr[0].as_str(), Some("one"));
694 assert_eq!(uris_arr[1].as_str(), Some("two"));
695
696 let results = index.query("uris:one", 10).await.unwrap();
698 assert_eq!(results.hits.len(), 1, "Should find doc with 'one'");
699 assert_eq!(results.hits[0].address.doc_id, 0);
700
701 let results = index.query("uris:two", 10).await.unwrap();
702 assert_eq!(results.hits.len(), 1, "Should find doc with 'two'");
703 assert_eq!(results.hits[0].address.doc_id, 0);
704
705 let results = index.query("uris:three", 10).await.unwrap();
706 assert_eq!(results.hits.len(), 1, "Should find doc with 'three'");
707 assert_eq!(results.hits[0].address.doc_id, 1);
708
709 let results = index.query("uris:nonexistent", 10).await.unwrap();
711 assert_eq!(results.hits.len(), 0, "Should not find non-existent value");
712 }
713
714 #[tokio::test]
721 async fn test_maxscore_optimization_for_or_queries() {
722 use crate::query::{BooleanQuery, TermQuery};
723
724 let mut schema_builder = SchemaBuilder::default();
725 let content = schema_builder.add_text_field("content", true, true);
726 let schema = schema_builder.build();
727
728 let dir = RamDirectory::new();
729 let config = IndexConfig::default();
730
731 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
733 .await
734 .unwrap();
735
736 let mut doc = Document::new();
738 doc.add_text(content, "rust programming language is fast");
739 writer.add_document(doc).unwrap();
740
741 let mut doc = Document::new();
743 doc.add_text(content, "rust is a systems language");
744 writer.add_document(doc).unwrap();
745
746 let mut doc = Document::new();
748 doc.add_text(content, "programming is fun");
749 writer.add_document(doc).unwrap();
750
751 let mut doc = Document::new();
753 doc.add_text(content, "python is easy to learn");
754 writer.add_document(doc).unwrap();
755
756 let mut doc = Document::new();
758 doc.add_text(content, "rust rust programming programming systems");
759 writer.add_document(doc).unwrap();
760
761 writer.commit().await.unwrap();
762
763 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
765
766 let or_query = BooleanQuery::new()
768 .should(TermQuery::text(content, "rust"))
769 .should(TermQuery::text(content, "programming"));
770
771 let results = index.search(&or_query, 10).await.unwrap();
772
773 assert_eq!(results.hits.len(), 4, "Should find exactly 4 documents");
775
776 let doc_ids: Vec<u32> = results.hits.iter().map(|h| h.address.doc_id).collect();
777 assert!(doc_ids.contains(&0), "Should find doc 0");
778 assert!(doc_ids.contains(&1), "Should find doc 1");
779 assert!(doc_ids.contains(&2), "Should find doc 2");
780 assert!(doc_ids.contains(&4), "Should find doc 4");
781 assert!(
782 !doc_ids.contains(&3),
783 "Should NOT find doc 3 (only has 'python')"
784 );
785
786 let single_query = BooleanQuery::new().should(TermQuery::text(content, "rust"));
788
789 let results = index.search(&single_query, 10).await.unwrap();
790 assert_eq!(results.hits.len(), 3, "Should find 3 documents with 'rust'");
791
792 let must_query = BooleanQuery::new()
794 .must(TermQuery::text(content, "rust"))
795 .should(TermQuery::text(content, "programming"));
796
797 let results = index.search(&must_query, 10).await.unwrap();
798 assert_eq!(results.hits.len(), 3, "Should find 3 documents with 'rust'");
800
801 let must_not_query = BooleanQuery::new()
803 .should(TermQuery::text(content, "rust"))
804 .should(TermQuery::text(content, "programming"))
805 .must_not(TermQuery::text(content, "systems"));
806
807 let results = index.search(&must_not_query, 10).await.unwrap();
808 let doc_ids: Vec<u32> = results.hits.iter().map(|h| h.address.doc_id).collect();
810 assert!(
811 !doc_ids.contains(&1),
812 "Should NOT find doc 1 (has 'systems')"
813 );
814 assert!(
815 !doc_ids.contains(&4),
816 "Should NOT find doc 4 (has 'systems')"
817 );
818
819 let or_query = BooleanQuery::new()
821 .should(TermQuery::text(content, "rust"))
822 .should(TermQuery::text(content, "programming"));
823
824 let results = index.search(&or_query, 2).await.unwrap();
825 assert_eq!(results.hits.len(), 2, "Should return only top 2 results");
826
827 }
830
831 #[tokio::test]
833 async fn test_boolean_or_maxscore_optimization() {
834 use crate::query::{BooleanQuery, TermQuery};
835
836 let mut schema_builder = SchemaBuilder::default();
837 let content = schema_builder.add_text_field("content", true, true);
838 let schema = schema_builder.build();
839
840 let dir = RamDirectory::new();
841 let config = IndexConfig::default();
842
843 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
844 .await
845 .unwrap();
846
847 for i in 0..10 {
849 let mut doc = Document::new();
850 let text = match i % 4 {
851 0 => "apple banana cherry",
852 1 => "apple orange",
853 2 => "banana grape",
854 _ => "cherry date",
855 };
856 doc.add_text(content, text);
857 writer.add_document(doc).unwrap();
858 }
859
860 writer.commit().await.unwrap();
861 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
862
863 let query = BooleanQuery::new()
865 .should(TermQuery::text(content, "apple"))
866 .should(TermQuery::text(content, "banana"));
867
868 let results = index.search(&query, 10).await.unwrap();
869
870 assert_eq!(results.hits.len(), 8, "Should find all matching docs");
873 }
874
875 #[tokio::test]
876 async fn test_vector_index_threshold_switch() {
877 use crate::dsl::{DenseVectorConfig, DenseVectorQuantization, VectorIndexType};
878
879 let mut schema_builder = SchemaBuilder::default();
881 let title = schema_builder.add_text_field("title", true, true);
882 let embedding = schema_builder.add_dense_vector_field_with_config(
883 "embedding",
884 true, true, DenseVectorConfig {
887 dim: 8,
888 index_type: VectorIndexType::IvfRaBitQ,
889 quantization: DenseVectorQuantization::F32,
890 num_clusters: Some(4), nprobe: 2,
892 build_threshold: Some(50), unit_norm: false,
894 },
895 );
896 let schema = schema_builder.build();
897
898 let dir = RamDirectory::new();
899 let config = IndexConfig::default();
900
901 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
903 .await
904 .unwrap();
905
906 for i in 0..30 {
908 let mut doc = Document::new();
909 doc.add_text(title, format!("Document {}", i));
910 let vec: Vec<f32> = (0..8).map(|_| (i as f32) / 30.0).collect();
912 doc.add_dense_vector(embedding, vec);
913 writer.add_document(doc).unwrap();
914 }
915 writer.commit().await.unwrap();
916
917 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
919 assert!(
920 index.segment_manager.trained().is_none(),
921 "Should not have trained centroids below threshold"
922 );
923
924 let query_vec: Vec<f32> = vec![0.5; 8];
926 let segments = index.segment_readers().await.unwrap();
927 assert!(!segments.is_empty());
928
929 let results = segments[0]
930 .search_dense_vector(
931 embedding,
932 &query_vec,
933 5,
934 0,
935 1.0,
936 crate::query::MultiValueCombiner::Max,
937 )
938 .await
939 .unwrap();
940 assert!(!results.is_empty(), "Flat search should return results");
941
942 let mut writer = IndexWriter::open(dir.clone(), config.clone())
944 .await
945 .unwrap();
946
947 for i in 30..60 {
949 let mut doc = Document::new();
950 doc.add_text(title, format!("Document {}", i));
951 let vec: Vec<f32> = (0..8).map(|_| (i as f32) / 60.0).collect();
952 doc.add_dense_vector(embedding, vec);
953 writer.add_document(doc).unwrap();
954 }
955 writer.commit().await.unwrap();
956
957 writer.build_vector_index().await.unwrap();
959
960 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
962 assert!(
963 index.segment_manager.trained().is_some(),
964 "Should have loaded trained centroids for embedding field"
965 );
966
967 let segments = index.segment_readers().await.unwrap();
969 let results = segments[0]
970 .search_dense_vector(
971 embedding,
972 &query_vec,
973 5,
974 0,
975 1.0,
976 crate::query::MultiValueCombiner::Max,
977 )
978 .await
979 .unwrap();
980 assert!(
981 !results.is_empty(),
982 "Search should return results after build"
983 );
984
985 let writer = IndexWriter::open(dir.clone(), config.clone())
987 .await
988 .unwrap();
989 writer.build_vector_index().await.unwrap(); assert!(writer.segment_manager.trained().is_some());
993 }
994
995 #[tokio::test]
998 async fn test_multi_round_merge_with_search() {
999 let mut schema_builder = SchemaBuilder::default();
1000 let title = schema_builder.add_text_field("title", true, true);
1001 let body = schema_builder.add_text_field("body", true, true);
1002 let schema = schema_builder.build();
1003
1004 let dir = RamDirectory::new();
1005 let config = IndexConfig {
1006 max_indexing_memory_bytes: 512,
1007 ..Default::default()
1008 };
1009
1010 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1012 .await
1013 .unwrap();
1014
1015 for batch in 0..5 {
1016 for i in 0..10 {
1017 let mut doc = Document::new();
1018 doc.add_text(
1019 title,
1020 format!("alpha bravo charlie batch{} doc{}", batch, i),
1021 );
1022 doc.add_text(
1023 body,
1024 format!("the quick brown fox jumps over the lazy dog number {}", i),
1025 );
1026 writer.add_document(doc).unwrap();
1027 }
1028 writer.commit().await.unwrap();
1029 }
1030
1031 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1032 let pre_merge_segments = index.segment_readers().await.unwrap().len();
1033 assert!(
1034 pre_merge_segments >= 3,
1035 "Expected >=3 segments, got {}",
1036 pre_merge_segments
1037 );
1038 assert_eq!(index.num_docs().await.unwrap(), 50);
1039
1040 let results = index.query("alpha", 100).await.unwrap();
1042 assert_eq!(results.hits.len(), 50, "all 50 docs should match 'alpha'");
1043
1044 let results = index.query("fox", 100).await.unwrap();
1045 assert_eq!(results.hits.len(), 50, "all 50 docs should match 'fox'");
1046
1047 let mut writer = IndexWriter::open(dir.clone(), config.clone())
1049 .await
1050 .unwrap();
1051 writer.force_merge().await.unwrap();
1052
1053 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1054 assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1055 assert_eq!(index.num_docs().await.unwrap(), 50);
1056
1057 let results = index.query("alpha", 100).await.unwrap();
1059 assert_eq!(
1060 results.hits.len(),
1061 50,
1062 "all 50 docs should match 'alpha' after merge 1"
1063 );
1064
1065 let results = index.query("fox", 100).await.unwrap();
1066 assert_eq!(
1067 results.hits.len(),
1068 50,
1069 "all 50 docs should match 'fox' after merge 1"
1070 );
1071
1072 let reader1 = index.reader().await.unwrap();
1074 let searcher1 = reader1.searcher().await.unwrap();
1075 let seg_id1 = searcher1.segment_readers()[0].meta().id;
1076 for i in 0..50 {
1077 let doc = searcher1.doc(seg_id1, i).await.unwrap();
1078 assert!(doc.is_some(), "doc {} should exist after merge 1", i);
1079 }
1080
1081 let mut writer = IndexWriter::open(dir.clone(), config.clone())
1083 .await
1084 .unwrap();
1085 for batch in 0..3 {
1086 for i in 0..10 {
1087 let mut doc = Document::new();
1088 doc.add_text(
1089 title,
1090 format!("delta echo foxtrot round2_batch{} doc{}", batch, i),
1091 );
1092 doc.add_text(
1093 body,
1094 format!("the quick brown fox jumps again number {}", i),
1095 );
1096 writer.add_document(doc).unwrap();
1097 }
1098 writer.commit().await.unwrap();
1099 }
1100
1101 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1102 assert_eq!(index.num_docs().await.unwrap(), 80);
1103 assert!(
1104 index.segment_readers().await.unwrap().len() >= 2,
1105 "Should have >=2 segments after round 2 ingestion"
1106 );
1107
1108 let results = index.query("fox", 100).await.unwrap();
1110 assert_eq!(results.hits.len(), 80, "all 80 docs should match 'fox'");
1111
1112 let results = index.query("alpha", 100).await.unwrap();
1113 assert_eq!(results.hits.len(), 50, "only round 1 docs match 'alpha'");
1114
1115 let results = index.query("delta", 100).await.unwrap();
1116 assert_eq!(results.hits.len(), 30, "only round 2 docs match 'delta'");
1117
1118 let mut writer = IndexWriter::open(dir.clone(), config.clone())
1120 .await
1121 .unwrap();
1122 writer.force_merge().await.unwrap();
1123
1124 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1125 assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1126 assert_eq!(index.num_docs().await.unwrap(), 80);
1127
1128 let results = index.query("fox", 100).await.unwrap();
1130 assert_eq!(results.hits.len(), 80, "all 80 docs after merge 2");
1131
1132 let results = index.query("alpha", 100).await.unwrap();
1133 assert_eq!(results.hits.len(), 50, "round 1 docs after merge 2");
1134
1135 let results = index.query("delta", 100).await.unwrap();
1136 assert_eq!(results.hits.len(), 30, "round 2 docs after merge 2");
1137
1138 let reader2 = index.reader().await.unwrap();
1140 let searcher2 = reader2.searcher().await.unwrap();
1141 let seg_id2 = searcher2.segment_readers()[0].meta().id;
1142 for i in 0..80 {
1143 let doc = searcher2.doc(seg_id2, i).await.unwrap();
1144 assert!(doc.is_some(), "doc {} should exist after merge 2", i);
1145 }
1146 }
1147
1148 #[tokio::test]
1151 async fn test_large_scale_merge_correctness() {
1152 let mut schema_builder = SchemaBuilder::default();
1153 let title = schema_builder.add_text_field("title", true, true);
1154 let schema = schema_builder.build();
1155
1156 let dir = RamDirectory::new();
1157 let config = IndexConfig {
1158 max_indexing_memory_bytes: 512,
1159 ..Default::default()
1160 };
1161
1162 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1163 .await
1164 .unwrap();
1165
1166 let total_docs = 200u32;
1169 for batch in 0..8 {
1170 for i in 0..25 {
1171 let mut doc = Document::new();
1172 doc.add_text(
1173 title,
1174 format!("common shared term unique_{} item{}", batch, i),
1175 );
1176 writer.add_document(doc).unwrap();
1177 }
1178 writer.commit().await.unwrap();
1179 }
1180
1181 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1183 assert_eq!(index.num_docs().await.unwrap(), total_docs);
1184
1185 let results = index.query("common", 300).await.unwrap();
1186 assert_eq!(
1187 results.hits.len(),
1188 total_docs as usize,
1189 "all docs should match 'common'"
1190 );
1191
1192 for batch in 0..8 {
1194 let q = format!("unique_{}", batch);
1195 let results = index.query(&q, 100).await.unwrap();
1196 assert_eq!(results.hits.len(), 25, "'{}' should match 25 docs", q);
1197 }
1198
1199 let mut writer = IndexWriter::open(dir.clone(), config.clone())
1201 .await
1202 .unwrap();
1203 writer.force_merge().await.unwrap();
1204
1205 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1207 assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1208 assert_eq!(index.num_docs().await.unwrap(), total_docs);
1209
1210 let results = index.query("common", 300).await.unwrap();
1211 assert_eq!(results.hits.len(), total_docs as usize);
1212
1213 for batch in 0..8 {
1214 let q = format!("unique_{}", batch);
1215 let results = index.query(&q, 100).await.unwrap();
1216 assert_eq!(results.hits.len(), 25, "'{}' after merge", q);
1217 }
1218
1219 let reader = index.reader().await.unwrap();
1221 let searcher = reader.searcher().await.unwrap();
1222 let seg_id = searcher.segment_readers()[0].meta().id;
1223 for i in 0..total_docs {
1224 let doc = searcher.doc(seg_id, i).await.unwrap();
1225 assert!(doc.is_some(), "doc {} missing after merge", i);
1226 }
1227 }
1228
1229 #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
1233 async fn test_auto_merge_triggered() {
1234 use crate::directories::MmapDirectory;
1235 let tmp_dir = tempfile::tempdir().unwrap();
1236 let dir = MmapDirectory::new(tmp_dir.path());
1237
1238 let mut schema_builder = SchemaBuilder::default();
1239 let title = schema_builder.add_text_field("title", true, true);
1240 let body = schema_builder.add_text_field("body", true, true);
1241 let schema = schema_builder.build();
1242
1243 let config = IndexConfig {
1245 max_indexing_memory_bytes: 4096,
1246 num_indexing_threads: 4,
1247 merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1248 ..Default::default()
1249 };
1250
1251 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1252 .await
1253 .unwrap();
1254
1255 for batch in 0..12 {
1257 for i in 0..50 {
1258 let mut doc = Document::new();
1259 doc.add_text(title, format!("document_{} batch_{} alpha bravo", i, batch));
1260 doc.add_text(
1261 body,
1262 format!(
1263 "the quick brown fox jumps over lazy dog number {} round {}",
1264 i, batch
1265 ),
1266 );
1267 writer.add_document(doc).unwrap();
1268 }
1269 writer.commit().await.unwrap();
1270 }
1271
1272 let pre_merge = writer.segment_manager.get_segment_ids().await.len();
1273
1274 writer.wait_for_merging_thread().await;
1277 writer.maybe_merge().await;
1278 writer.wait_for_merging_thread().await;
1279
1280 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1282 let segment_count = index.segment_readers().await.unwrap().len();
1283 eprintln!(
1284 "Segments: {} before merge, {} after auto-merge",
1285 pre_merge, segment_count
1286 );
1287 assert!(
1288 segment_count < pre_merge,
1289 "Expected auto-merge to reduce segments from {}, got {}",
1290 pre_merge,
1291 segment_count
1292 );
1293 }
1294
1295 #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
1300 async fn test_commit_with_vectors_and_background_merge() {
1301 use crate::directories::MmapDirectory;
1302 use crate::dsl::DenseVectorConfig;
1303
1304 let tmp_dir = tempfile::tempdir().unwrap();
1305 let dir = MmapDirectory::new(tmp_dir.path());
1306
1307 let mut schema_builder = SchemaBuilder::default();
1308 let title = schema_builder.add_text_field("title", true, true);
1309 let vec_config = DenseVectorConfig::new(8).with_build_threshold(10);
1311 let embedding =
1312 schema_builder.add_dense_vector_field_with_config("embedding", true, true, vec_config);
1313 let schema = schema_builder.build();
1314
1315 let config = IndexConfig {
1317 max_indexing_memory_bytes: 4096,
1318 num_indexing_threads: 4,
1319 merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1320 ..Default::default()
1321 };
1322
1323 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1324 .await
1325 .unwrap();
1326
1327 for batch in 0..12 {
1330 for i in 0..5 {
1331 let mut doc = Document::new();
1332 doc.add_text(title, format!("doc_{}_batch_{}", i, batch));
1333 let vec: Vec<f32> = (0..8).map(|j| (i * 8 + j + batch) as f32 * 0.1).collect();
1335 doc.add_dense_vector(embedding, vec);
1336 writer.add_document(doc).unwrap();
1337 }
1338 writer.commit().await.unwrap();
1339 }
1340 writer.wait_for_merging_thread().await;
1341
1342 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1343 let num_docs = index.num_docs().await.unwrap();
1344 assert_eq!(num_docs, 60, "Expected 60 docs, got {}", num_docs);
1345 }
1346
1347 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1350 async fn test_force_merge_many_segments() {
1351 use crate::directories::MmapDirectory;
1352 let tmp_dir = tempfile::tempdir().unwrap();
1353 let dir = MmapDirectory::new(tmp_dir.path());
1354
1355 let mut schema_builder = SchemaBuilder::default();
1356 let title = schema_builder.add_text_field("title", true, true);
1357 let schema = schema_builder.build();
1358
1359 let config = IndexConfig {
1360 max_indexing_memory_bytes: 512,
1361 ..Default::default()
1362 };
1363
1364 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1365 .await
1366 .unwrap();
1367
1368 for batch in 0..50 {
1370 for i in 0..3 {
1371 let mut doc = Document::new();
1372 doc.add_text(title, format!("term_{} batch_{}", i, batch));
1373 writer.add_document(doc).unwrap();
1374 }
1375 writer.commit().await.unwrap();
1376 }
1377 writer.wait_for_merging_thread().await;
1379
1380 let seg_ids = writer.segment_manager.get_segment_ids().await;
1381 let pre = seg_ids.len();
1382 eprintln!("Segments before force_merge: {}", pre);
1383 assert!(pre >= 2, "Expected multiple segments, got {}", pre);
1384
1385 writer.force_merge().await.unwrap();
1387
1388 let index2 = Index::open(dir, config).await.unwrap();
1389 let post = index2.segment_readers().await.unwrap().len();
1390 eprintln!("Segments after force_merge: {}", post);
1391 assert_eq!(post, 1);
1392 assert_eq!(index2.num_docs().await.unwrap(), 150);
1393 }
1394
1395 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1399 async fn test_background_merge_generation() {
1400 use crate::directories::MmapDirectory;
1401 let tmp_dir = tempfile::tempdir().unwrap();
1402 let dir = MmapDirectory::new(tmp_dir.path());
1403
1404 let mut schema_builder = SchemaBuilder::default();
1405 let title = schema_builder.add_text_field("title", true, true);
1406 let schema = schema_builder.build();
1407
1408 let config = IndexConfig {
1409 max_indexing_memory_bytes: 4096,
1410 num_indexing_threads: 2,
1411 merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1412 ..Default::default()
1413 };
1414
1415 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1416 .await
1417 .unwrap();
1418
1419 for batch in 0..15 {
1421 for i in 0..5 {
1422 let mut doc = Document::new();
1423 doc.add_text(title, format!("doc_{}_batch_{}", i, batch));
1424 writer.add_document(doc).unwrap();
1425 }
1426 writer.commit().await.unwrap();
1427 }
1428 writer.wait_for_merging_thread().await;
1429
1430 let metas = writer
1432 .segment_manager
1433 .read_metadata(|m| m.segment_metas.clone())
1434 .await;
1435
1436 let max_gen = metas.values().map(|m| m.generation).max().unwrap_or(0);
1437 eprintln!(
1438 "Segments after merge: {}, max generation: {}",
1439 metas.len(),
1440 max_gen
1441 );
1442
1443 assert!(
1445 max_gen >= 1,
1446 "Expected at least one merged segment (gen >= 1), got max_gen={}",
1447 max_gen
1448 );
1449
1450 for (id, info) in &metas {
1452 if info.generation > 0 {
1453 assert!(
1454 !info.ancestors.is_empty(),
1455 "Segment {} has gen={} but no ancestors",
1456 id,
1457 info.generation
1458 );
1459 } else {
1460 assert!(
1461 info.ancestors.is_empty(),
1462 "Fresh segment {} has gen=0 but has ancestors",
1463 id
1464 );
1465 }
1466 }
1467 }
1468
1469 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1473 async fn test_merge_preserves_all_documents() {
1474 use crate::directories::MmapDirectory;
1475 let tmp_dir = tempfile::tempdir().unwrap();
1476 let dir = MmapDirectory::new(tmp_dir.path());
1477
1478 let mut schema_builder = SchemaBuilder::default();
1479 let title = schema_builder.add_text_field("title", true, true);
1480 let schema = schema_builder.build();
1481
1482 let config = IndexConfig {
1483 max_indexing_memory_bytes: 4096,
1484 ..Default::default()
1485 };
1486
1487 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1488 .await
1489 .unwrap();
1490
1491 let total_docs = 1200;
1492 let docs_per_batch = 60;
1493 let batches = total_docs / docs_per_batch;
1494
1495 for batch in 0..batches {
1497 for i in 0..docs_per_batch {
1498 let doc_num = batch * docs_per_batch + i;
1499 let mut doc = Document::new();
1500 doc.add_text(
1501 title,
1502 format!("uid_{} common_term batch_{}", doc_num, batch),
1503 );
1504 writer.add_document(doc).unwrap();
1505 }
1506 writer.commit().await.unwrap();
1507 }
1508
1509 let pre_segments = writer.segment_manager.get_segment_ids().await.len();
1510 assert!(
1511 pre_segments >= 2,
1512 "Need multiple segments, got {}",
1513 pre_segments
1514 );
1515
1516 writer.force_merge().await.unwrap();
1518
1519 let index = Index::open(dir, config).await.unwrap();
1520 assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1521 assert_eq!(
1522 index.num_docs().await.unwrap(),
1523 total_docs as u32,
1524 "Doc count mismatch after force_merge"
1525 );
1526
1527 let results = index.query("common_term", total_docs + 100).await.unwrap();
1529 assert_eq!(
1530 results.hits.len(),
1531 total_docs,
1532 "common_term should match all docs"
1533 );
1534
1535 for check in [0, 1, total_docs / 2, total_docs - 1] {
1537 let q = format!("uid_{}", check);
1538 let results = index.query(&q, 10).await.unwrap();
1539 assert_eq!(results.hits.len(), 1, "'{}' should match exactly 1 doc", q);
1540 }
1541 }
1542
1543 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1546 async fn test_multi_round_merge_doc_integrity() {
1547 use crate::directories::MmapDirectory;
1548 let tmp_dir = tempfile::tempdir().unwrap();
1549 let dir = MmapDirectory::new(tmp_dir.path());
1550
1551 let mut schema_builder = SchemaBuilder::default();
1552 let title = schema_builder.add_text_field("title", true, true);
1553 let schema = schema_builder.build();
1554
1555 let config = IndexConfig {
1556 max_indexing_memory_bytes: 4096,
1557 num_indexing_threads: 2,
1558 merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1559 ..Default::default()
1560 };
1561
1562 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1563 .await
1564 .unwrap();
1565
1566 let mut expected_total = 0u64;
1567
1568 for round in 0..4 {
1570 let docs_this_round = 50 + round * 25; for batch in 0..5 {
1572 for i in 0..docs_this_round / 5 {
1573 let mut doc = Document::new();
1574 doc.add_text(
1575 title,
1576 format!("round_{}_batch_{}_doc_{} searchable", round, batch, i),
1577 );
1578 writer.add_document(doc).unwrap();
1579 }
1580 writer.commit().await.unwrap();
1581 }
1582 writer.wait_for_merging_thread().await;
1583
1584 expected_total += docs_this_round as u64;
1585
1586 let actual = writer
1587 .segment_manager
1588 .read_metadata(|m| {
1589 m.segment_metas
1590 .values()
1591 .map(|s| s.num_docs as u64)
1592 .sum::<u64>()
1593 })
1594 .await;
1595
1596 assert_eq!(
1597 actual, expected_total,
1598 "Round {}: expected {} docs, metadata reports {}",
1599 round, expected_total, actual
1600 );
1601 }
1602
1603 let index = Index::open(dir, config).await.unwrap();
1605 assert_eq!(index.num_docs().await.unwrap(), expected_total as u32);
1606
1607 let results = index
1608 .query("searchable", expected_total as usize + 100)
1609 .await
1610 .unwrap();
1611 assert_eq!(
1612 results.hits.len(),
1613 expected_total as usize,
1614 "All docs should match 'searchable'"
1615 );
1616
1617 let metas = index
1619 .segment_manager()
1620 .read_metadata(|m| m.segment_metas.clone())
1621 .await;
1622 let max_gen = metas.values().map(|m| m.generation).max().unwrap_or(0);
1623 eprintln!(
1624 "Final: {} segments, {} docs, max generation={}",
1625 metas.len(),
1626 expected_total,
1627 max_gen
1628 );
1629 assert!(
1630 max_gen >= 1,
1631 "Multiple merge rounds should produce gen >= 1"
1632 );
1633 }
1634
1635 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1641 async fn test_segment_count_bounded_during_sustained_indexing() {
1642 use crate::directories::MmapDirectory;
1643 let tmp_dir = tempfile::tempdir().unwrap();
1644 let dir = MmapDirectory::new(tmp_dir.path());
1645
1646 let mut schema_builder = SchemaBuilder::default();
1647 let title = schema_builder.add_text_field("title", true, false);
1648 let schema = schema_builder.build();
1649
1650 let policy = crate::merge::TieredMergePolicy {
1651 segments_per_tier: 3,
1652 max_merge_at_once: 5,
1653 tier_factor: 10.0,
1654 tier_floor: 50,
1655 max_merged_docs: 1_000_000,
1656 };
1657
1658 let config = IndexConfig {
1659 max_indexing_memory_bytes: 4096, num_indexing_threads: 1,
1661 merge_policy: Box::new(policy),
1662 max_concurrent_merges: 4,
1663 ..Default::default()
1664 };
1665
1666 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1667 .await
1668 .unwrap();
1669
1670 let num_commits = 40;
1671 let docs_per_commit = 30;
1672 let total_docs = num_commits * docs_per_commit;
1673 let mut max_segments_seen = 0usize;
1674
1675 for commit_idx in 0..num_commits {
1676 for i in 0..docs_per_commit {
1677 let mut doc = Document::new();
1678 doc.add_text(
1679 title,
1680 format!("doc_{} text", commit_idx * docs_per_commit + i),
1681 );
1682 writer.add_document(doc).unwrap();
1683 }
1684 writer.commit().await.unwrap();
1685
1686 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1688
1689 let seg_count = writer.segment_manager.get_segment_ids().await.len();
1690 max_segments_seen = max_segments_seen.max(seg_count);
1691 }
1692
1693 writer.wait_for_all_merges().await;
1695
1696 let final_segments = writer.segment_manager.get_segment_ids().await.len();
1697 let final_docs: u64 = writer
1698 .segment_manager
1699 .read_metadata(|m| {
1700 m.segment_metas
1701 .values()
1702 .map(|s| s.num_docs as u64)
1703 .sum::<u64>()
1704 })
1705 .await;
1706
1707 eprintln!(
1708 "Sustained indexing: {} commits, {} total docs, final segments={}, max segments seen={}",
1709 num_commits, total_docs, final_segments, max_segments_seen
1710 );
1711
1712 let max_allowed = num_commits / 2; assert!(
1719 max_segments_seen <= max_allowed,
1720 "Segment count grew too fast: max seen {} > allowed {} (out of {} commits). \
1721 Merging is not keeping up.",
1722 max_segments_seen,
1723 max_allowed,
1724 num_commits
1725 );
1726
1727 assert!(
1729 final_segments <= 10,
1730 "After all merges, expected ≤10 segments, got {}",
1731 final_segments
1732 );
1733
1734 assert_eq!(
1736 final_docs, total_docs as u64,
1737 "Expected {} docs, metadata reports {}",
1738 total_docs, final_docs
1739 );
1740 }
1741
1742 #[tokio::test]
1749 async fn test_needle_fulltext_single_segment() {
1750 let mut sb = SchemaBuilder::default();
1751 let title = sb.add_text_field("title", true, true);
1752 let body = sb.add_text_field("body", true, true);
1753 let schema = sb.build();
1754
1755 let dir = RamDirectory::new();
1756 let config = IndexConfig::default();
1757 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1758 .await
1759 .unwrap();
1760
1761 for i in 0..100 {
1763 let mut doc = Document::new();
1764 doc.add_text(title, format!("Hay document number {}", i));
1765 doc.add_text(
1766 body,
1767 "common words repeated across all hay documents filler text",
1768 );
1769 writer.add_document(doc).unwrap();
1770 }
1771
1772 let mut needle = Document::new();
1774 needle.add_text(title, "The unique needle xylophone");
1775 needle.add_text(
1776 body,
1777 "This document contains the extraordinary term xylophone",
1778 );
1779 writer.add_document(needle).unwrap();
1782
1783 for i in 100..150 {
1785 let mut doc = Document::new();
1786 doc.add_text(title, format!("More hay document {}", i));
1787 doc.add_text(body, "common words filler text again and again");
1788 writer.add_document(doc).unwrap();
1789 }
1790
1791 writer.commit().await.unwrap();
1792
1793 let index = Index::open(dir, config).await.unwrap();
1794 assert_eq!(index.num_docs().await.unwrap(), 151);
1795
1796 let results = index.query("xylophone", 10).await.unwrap();
1798 assert_eq!(results.hits.len(), 1, "Should find exactly the needle");
1799 assert!(results.hits[0].score > 0.0, "Score should be positive");
1800
1801 let doc = index
1803 .get_document(&results.hits[0].address)
1804 .await
1805 .unwrap()
1806 .unwrap();
1807 let title_val = doc.get_first(title).unwrap().as_text().unwrap();
1808 assert!(
1809 title_val.contains("xylophone"),
1810 "Retrieved doc should be the needle"
1811 );
1812
1813 let results = index.query("common", 200).await.unwrap();
1815 assert!(
1816 results.hits.len() >= 100,
1817 "Common term should match many docs"
1818 );
1819
1820 let results = index.query("nonexistentterm99999", 10).await.unwrap();
1822 assert_eq!(
1823 results.hits.len(),
1824 0,
1825 "Non-existent term should match nothing"
1826 );
1827 }
1828
1829 #[tokio::test]
1831 async fn test_needle_fulltext_multi_segment() {
1832 use crate::query::TermQuery;
1833
1834 let mut sb = SchemaBuilder::default();
1835 let content = sb.add_text_field("content", true, true);
1836 let schema = sb.build();
1837
1838 let dir = RamDirectory::new();
1839 let config = IndexConfig::default();
1840 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1841 .await
1842 .unwrap();
1843
1844 for i in 0..50 {
1846 let mut doc = Document::new();
1847 doc.add_text(content, format!("segment one hay document {}", i));
1848 writer.add_document(doc).unwrap();
1849 }
1850 writer.commit().await.unwrap();
1851
1852 let mut needle = Document::new();
1854 needle.add_text(content, "the magnificent quetzalcoatl serpent deity");
1855 writer.add_document(needle).unwrap();
1856 for i in 0..49 {
1857 let mut doc = Document::new();
1858 doc.add_text(content, format!("segment two hay document {}", i));
1859 writer.add_document(doc).unwrap();
1860 }
1861 writer.commit().await.unwrap();
1862
1863 for i in 0..50 {
1865 let mut doc = Document::new();
1866 doc.add_text(content, format!("segment three hay document {}", i));
1867 writer.add_document(doc).unwrap();
1868 }
1869 writer.commit().await.unwrap();
1870
1871 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1872 assert_eq!(index.num_docs().await.unwrap(), 150);
1873 let num_segments = index.segment_readers().await.unwrap().len();
1874 assert!(
1875 num_segments >= 2,
1876 "Should have multiple segments, got {}",
1877 num_segments
1878 );
1879
1880 let results = index.query("quetzalcoatl", 10).await.unwrap();
1882 assert_eq!(
1883 results.hits.len(),
1884 1,
1885 "Should find exactly 1 needle across segments"
1886 );
1887
1888 let reader = index.reader().await.unwrap();
1890 let searcher = reader.searcher().await.unwrap();
1891 let tq = TermQuery::text(content, "quetzalcoatl");
1892 let results = searcher.search(&tq, 10).await.unwrap();
1893 assert_eq!(results.len(), 1, "TermQuery should also find the needle");
1894
1895 let doc = searcher
1897 .doc(results[0].segment_id, results[0].doc_id)
1898 .await
1899 .unwrap()
1900 .unwrap();
1901 let text = doc.get_first(content).unwrap().as_text().unwrap();
1902 assert!(
1903 text.contains("quetzalcoatl"),
1904 "Should retrieve needle content"
1905 );
1906
1907 let results = index.query("document", 200).await.unwrap();
1909 assert!(
1910 results.hits.len() >= 149,
1911 "Should find hay docs across all segments"
1912 );
1913 }
1914
1915 #[tokio::test]
1917 async fn test_needle_sparse_vector() {
1918 use crate::query::SparseVectorQuery;
1919
1920 let mut sb = SchemaBuilder::default();
1921 let title = sb.add_text_field("title", true, true);
1922 let sparse = sb.add_sparse_vector_field("sparse", true, true);
1923 let schema = sb.build();
1924
1925 let dir = RamDirectory::new();
1926 let config = IndexConfig::default();
1927 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1928 .await
1929 .unwrap();
1930
1931 for i in 0..100 {
1933 let mut doc = Document::new();
1934 doc.add_text(title, format!("Hay sparse doc {}", i));
1935 let entries: Vec<(u32, f32)> = (0..10)
1937 .map(|d| (d, 0.1 + (i as f32 * 0.001) + (d as f32 * 0.01)))
1938 .collect();
1939 doc.add_sparse_vector(sparse, entries);
1940 writer.add_document(doc).unwrap();
1941 }
1942
1943 let mut needle = Document::new();
1945 needle.add_text(title, "Needle sparse document");
1946 needle.add_sparse_vector(
1947 sparse,
1948 vec![(1000, 0.9), (1001, 0.8), (1002, 0.7), (5, 0.3)],
1949 );
1950 writer.add_document(needle).unwrap();
1951
1952 for i in 100..150 {
1954 let mut doc = Document::new();
1955 doc.add_text(title, format!("More hay sparse doc {}", i));
1956 let entries: Vec<(u32, f32)> = (0..10).map(|d| (d, 0.2 + (d as f32 * 0.02))).collect();
1957 doc.add_sparse_vector(sparse, entries);
1958 writer.add_document(doc).unwrap();
1959 }
1960
1961 writer.commit().await.unwrap();
1962
1963 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1964 assert_eq!(index.num_docs().await.unwrap(), 151);
1965
1966 let reader = index.reader().await.unwrap();
1968 let searcher = reader.searcher().await.unwrap();
1969 let query = SparseVectorQuery::new(sparse, vec![(1000, 1.0), (1001, 1.0), (1002, 1.0)]);
1970 let results = searcher.search(&query, 10).await.unwrap();
1971 assert_eq!(results.len(), 1, "Only needle has dims 1000-1002");
1972 assert!(results[0].score > 0.0, "Needle score should be positive");
1973
1974 let doc = searcher
1976 .doc(results[0].segment_id, results[0].doc_id)
1977 .await
1978 .unwrap()
1979 .unwrap();
1980 let title_val = doc.get_first(title).unwrap().as_text().unwrap();
1981 assert_eq!(title_val, "Needle sparse document");
1982
1983 let query_shared = SparseVectorQuery::new(sparse, vec![(5, 1.0)]);
1985 let results = searcher.search(&query_shared, 200).await.unwrap();
1986 assert!(
1987 results.len() >= 100,
1988 "Shared dim 5 should match many docs, got {}",
1989 results.len()
1990 );
1991
1992 let query_missing = SparseVectorQuery::new(sparse, vec![(99999, 1.0)]);
1994 let results = searcher.search(&query_missing, 10).await.unwrap();
1995 assert_eq!(
1996 results.len(),
1997 0,
1998 "Non-existent dimension should match nothing"
1999 );
2000 }
2001
2002 #[tokio::test]
2004 async fn test_needle_sparse_vector_multi_segment_merge() {
2005 use crate::query::SparseVectorQuery;
2006
2007 let mut sb = SchemaBuilder::default();
2008 let title = sb.add_text_field("title", true, true);
2009 let sparse = sb.add_sparse_vector_field("sparse", true, true);
2010 let schema = sb.build();
2011
2012 let dir = RamDirectory::new();
2013 let config = IndexConfig::default();
2014 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
2015 .await
2016 .unwrap();
2017
2018 for i in 0..30 {
2020 let mut doc = Document::new();
2021 doc.add_text(title, format!("seg1 hay {}", i));
2022 doc.add_sparse_vector(sparse, vec![(0, 0.5), (1, 0.3)]);
2023 writer.add_document(doc).unwrap();
2024 }
2025 writer.commit().await.unwrap();
2026
2027 let mut needle = Document::new();
2029 needle.add_text(title, "seg2 needle");
2030 needle.add_sparse_vector(sparse, vec![(500, 0.95), (501, 0.85)]);
2031 writer.add_document(needle).unwrap();
2032 for i in 0..29 {
2033 let mut doc = Document::new();
2034 doc.add_text(title, format!("seg2 hay {}", i));
2035 doc.add_sparse_vector(sparse, vec![(0, 0.4), (2, 0.6)]);
2036 writer.add_document(doc).unwrap();
2037 }
2038 writer.commit().await.unwrap();
2039
2040 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
2042 assert_eq!(index.num_docs().await.unwrap(), 60);
2043
2044 let reader = index.reader().await.unwrap();
2045 let searcher = reader.searcher().await.unwrap();
2046 let query = SparseVectorQuery::new(sparse, vec![(500, 1.0), (501, 1.0)]);
2047 let results = searcher.search(&query, 10).await.unwrap();
2048 assert_eq!(results.len(), 1, "Pre-merge: needle should be found");
2049 let doc = searcher
2050 .doc(results[0].segment_id, results[0].doc_id)
2051 .await
2052 .unwrap()
2053 .unwrap();
2054 assert_eq!(
2055 doc.get_first(title).unwrap().as_text().unwrap(),
2056 "seg2 needle"
2057 );
2058
2059 let mut writer = IndexWriter::open(dir.clone(), config.clone())
2061 .await
2062 .unwrap();
2063 writer.force_merge().await.unwrap();
2064
2065 let index = Index::open(dir, config).await.unwrap();
2067 assert_eq!(index.segment_readers().await.unwrap().len(), 1);
2068 assert_eq!(index.num_docs().await.unwrap(), 60);
2069
2070 let reader = index.reader().await.unwrap();
2071 let searcher = reader.searcher().await.unwrap();
2072 let query = SparseVectorQuery::new(sparse, vec![(500, 1.0), (501, 1.0)]);
2073 let results = searcher.search(&query, 10).await.unwrap();
2074 assert_eq!(results.len(), 1, "Post-merge: needle should still be found");
2075 let doc = searcher
2076 .doc(results[0].segment_id, results[0].doc_id)
2077 .await
2078 .unwrap()
2079 .unwrap();
2080 assert_eq!(
2081 doc.get_first(title).unwrap().as_text().unwrap(),
2082 "seg2 needle"
2083 );
2084 }
2085
2086 #[tokio::test]
2088 async fn test_needle_dense_vector_flat() {
2089 use crate::dsl::{DenseVectorConfig, VectorIndexType};
2090 use crate::query::DenseVectorQuery;
2091
2092 let dim = 16;
2093 let mut sb = SchemaBuilder::default();
2094 let title = sb.add_text_field("title", true, true);
2095 let embedding = sb.add_dense_vector_field_with_config(
2096 "embedding",
2097 true,
2098 true,
2099 DenseVectorConfig {
2100 dim,
2101 index_type: VectorIndexType::Flat,
2102 quantization: crate::dsl::DenseVectorQuantization::F32,
2103 num_clusters: None,
2104 nprobe: 0,
2105 build_threshold: None,
2106 unit_norm: false,
2107 },
2108 );
2109 let schema = sb.build();
2110
2111 let dir = RamDirectory::new();
2112 let config = IndexConfig::default();
2113 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
2114 .await
2115 .unwrap();
2116
2117 for i in 0..100 {
2119 let mut doc = Document::new();
2120 doc.add_text(title, format!("Hay dense doc {}", i));
2121 let vec: Vec<f32> = (0..dim)
2123 .map(|d| ((i * 7 + d * 13) % 100) as f32 / 1000.0)
2124 .collect();
2125 doc.add_dense_vector(embedding, vec);
2126 writer.add_document(doc).unwrap();
2127 }
2128
2129 let mut needle = Document::new();
2131 needle.add_text(title, "Needle dense document");
2132 let needle_vec: Vec<f32> = vec![1.0; dim];
2133 needle.add_dense_vector(embedding, needle_vec.clone());
2134 writer.add_document(needle).unwrap();
2135
2136 writer.commit().await.unwrap();
2137
2138 let index = Index::open(dir, config).await.unwrap();
2139 assert_eq!(index.num_docs().await.unwrap(), 101);
2140
2141 let reader = index.reader().await.unwrap();
2143 let searcher = reader.searcher().await.unwrap();
2144 let query = DenseVectorQuery::new(embedding, needle_vec);
2145 let results = searcher.search(&query, 5).await.unwrap();
2146 assert!(!results.is_empty(), "Should find at least 1 result");
2147
2148 let top_doc = searcher
2150 .doc(results[0].segment_id, results[0].doc_id)
2151 .await
2152 .unwrap()
2153 .unwrap();
2154 let top_title = top_doc.get_first(title).unwrap().as_text().unwrap();
2155 assert_eq!(
2156 top_title, "Needle dense document",
2157 "Top result should be the needle (exact vector match)"
2158 );
2159 assert!(
2160 results[0].score > 0.9,
2161 "Exact match should have very high cosine similarity, got {}",
2162 results[0].score
2163 );
2164 }
2165
2166 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
2169 async fn test_needle_combined_all_modalities() {
2170 use crate::directories::MmapDirectory;
2171 use crate::dsl::{DenseVectorConfig, VectorIndexType};
2172 use crate::query::{DenseVectorQuery, SparseVectorQuery, TermQuery};
2173
2174 let tmp_dir = tempfile::tempdir().unwrap();
2175 let dir = MmapDirectory::new(tmp_dir.path());
2176
2177 let dim = 8;
2178 let mut sb = SchemaBuilder::default();
2179 let title = sb.add_text_field("title", true, true);
2180 let body = sb.add_text_field("body", true, true);
2181 let sparse = sb.add_sparse_vector_field("sparse", true, true);
2182 let embedding = sb.add_dense_vector_field_with_config(
2183 "embedding",
2184 true,
2185 true,
2186 DenseVectorConfig {
2187 dim,
2188 index_type: VectorIndexType::Flat,
2189 quantization: crate::dsl::DenseVectorQuantization::F32,
2190 num_clusters: None,
2191 nprobe: 0,
2192 build_threshold: None,
2193 unit_norm: false,
2194 },
2195 );
2196 let schema = sb.build();
2197
2198 let config = IndexConfig::default();
2199 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
2200 .await
2201 .unwrap();
2202
2203 for i in 0..80u32 {
2205 let mut doc = Document::new();
2206 doc.add_text(title, format!("Hay doc {}", i));
2207 doc.add_text(body, "general filler text about nothing special");
2208 doc.add_sparse_vector(sparse, vec![(0, 0.3), (1, 0.2), ((i % 10) + 10, 0.5)]);
2209 let vec: Vec<f32> = (0..dim)
2210 .map(|d| ((i as usize * 3 + d * 7) % 50) as f32 / 100.0)
2211 .collect();
2212 doc.add_dense_vector(embedding, vec);
2213 writer.add_document(doc).unwrap();
2214 }
2215
2216 let mut needle = Document::new();
2218 needle.add_text(title, "The extraordinary rhinoceros");
2219 needle.add_text(
2220 body,
2221 "This document about rhinoceros is the only one with this word",
2222 );
2223 needle.add_sparse_vector(sparse, vec![(9999, 0.99), (9998, 0.88)]);
2224 let needle_vec = vec![0.9; dim];
2225 needle.add_dense_vector(embedding, needle_vec.clone());
2226 writer.add_document(needle).unwrap();
2227
2228 writer.commit().await.unwrap();
2229
2230 let index = Index::open(dir, config).await.unwrap();
2231 assert_eq!(index.num_docs().await.unwrap(), 81);
2232
2233 let reader = index.reader().await.unwrap();
2234 let searcher = reader.searcher().await.unwrap();
2235
2236 let tq = TermQuery::text(body, "rhinoceros");
2238 let results = searcher.search(&tq, 10).await.unwrap();
2239 assert_eq!(
2240 results.len(),
2241 1,
2242 "Full-text: should find exactly the needle"
2243 );
2244 let doc = searcher
2245 .doc(results[0].segment_id, results[0].doc_id)
2246 .await
2247 .unwrap()
2248 .unwrap();
2249 assert!(
2250 doc.get_first(title)
2251 .unwrap()
2252 .as_text()
2253 .unwrap()
2254 .contains("rhinoceros")
2255 );
2256
2257 let sq = SparseVectorQuery::new(sparse, vec![(9999, 1.0), (9998, 1.0)]);
2259 let results = searcher.search(&sq, 10).await.unwrap();
2260 assert_eq!(results.len(), 1, "Sparse: should find exactly the needle");
2261 let doc = searcher
2262 .doc(results[0].segment_id, results[0].doc_id)
2263 .await
2264 .unwrap()
2265 .unwrap();
2266 assert!(
2267 doc.get_first(title)
2268 .unwrap()
2269 .as_text()
2270 .unwrap()
2271 .contains("rhinoceros")
2272 );
2273
2274 let dq = DenseVectorQuery::new(embedding, needle_vec);
2276 let results = searcher.search(&dq, 1).await.unwrap();
2277 assert!(!results.is_empty(), "Dense: should find at least 1 result");
2278 let doc = searcher
2279 .doc(results[0].segment_id, results[0].doc_id)
2280 .await
2281 .unwrap()
2282 .unwrap();
2283 assert_eq!(
2284 doc.get_first(title).unwrap().as_text().unwrap(),
2285 "The extraordinary rhinoceros",
2286 "Dense: top-1 should be the needle"
2287 );
2288
2289 let ft_doc_id = {
2291 let tq = TermQuery::text(body, "rhinoceros");
2292 let r = searcher.search(&tq, 1).await.unwrap();
2293 r[0].doc_id
2294 };
2295 let sp_doc_id = {
2296 let sq = SparseVectorQuery::new(sparse, vec![(9999, 1.0)]);
2297 let r = searcher.search(&sq, 1).await.unwrap();
2298 r[0].doc_id
2299 };
2300 let dn_doc_id = {
2301 let dq = DenseVectorQuery::new(embedding, vec![0.9; dim]);
2302 let r = searcher.search(&dq, 1).await.unwrap();
2303 r[0].doc_id
2304 };
2305
2306 assert_eq!(
2307 ft_doc_id, sp_doc_id,
2308 "Full-text and sparse should find same doc"
2309 );
2310 assert_eq!(
2311 sp_doc_id, dn_doc_id,
2312 "Sparse and dense should find same doc"
2313 );
2314 }
2315
2316 #[tokio::test]
2318 async fn test_many_needles_all_found() {
2319 let mut sb = SchemaBuilder::default();
2320 let content = sb.add_text_field("content", true, true);
2321 let schema = sb.build();
2322
2323 let dir = RamDirectory::new();
2324 let config = IndexConfig::default();
2325 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
2326 .await
2327 .unwrap();
2328
2329 let num_needles = 20usize;
2330 let hay_per_batch = 50usize;
2331 let needle_terms: Vec<String> = (0..num_needles)
2332 .map(|i| format!("uniqueneedle{:04}", i))
2333 .collect();
2334
2335 for batch in 0..4 {
2337 for i in 0..hay_per_batch {
2339 let mut doc = Document::new();
2340 doc.add_text(
2341 content,
2342 format!("hay batch {} item {} common filler", batch, i),
2343 );
2344 writer.add_document(doc).unwrap();
2345 }
2346 for n in 0..5 {
2348 let needle_idx = batch * 5 + n;
2349 let mut doc = Document::new();
2350 doc.add_text(
2351 content,
2352 format!("this is {} among many documents", needle_terms[needle_idx]),
2353 );
2354 writer.add_document(doc).unwrap();
2355 }
2356 writer.commit().await.unwrap();
2357 }
2358
2359 let index = Index::open(dir, config).await.unwrap();
2360 let total = index.num_docs().await.unwrap();
2361 assert_eq!(total, (hay_per_batch * 4 + num_needles) as u32);
2362
2363 for term in &needle_terms {
2365 let results = index.query(term, 10).await.unwrap();
2366 assert_eq!(
2367 results.hits.len(),
2368 1,
2369 "Should find exactly 1 doc for needle '{}'",
2370 term
2371 );
2372 }
2373
2374 let results = index.query("common", 500).await.unwrap();
2376 assert_eq!(
2377 results.hits.len(),
2378 hay_per_batch * 4,
2379 "Common term should match all {} hay docs",
2380 hay_per_batch * 4
2381 );
2382 }
2383
2384 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
2388 async fn test_store_fields_survive_multiple_merges() {
2389 use crate::directories::MmapDirectory;
2390
2391 let tmp_dir = tempfile::tempdir().unwrap();
2392 let dir = MmapDirectory::new(tmp_dir.path());
2393
2394 let mut schema_builder = SchemaBuilder::default();
2395 let title = schema_builder.add_text_field("title", true, true);
2396 let body = schema_builder.add_text_field("body", false, true);
2397 let schema = schema_builder.build();
2398
2399 let config = IndexConfig {
2400 max_indexing_memory_bytes: 1024 * 64,
2401 num_indexing_threads: 2,
2402 merge_policy: Box::new(crate::merge::NoMergePolicy),
2403 ..Default::default()
2404 };
2405
2406 let make_doc = |round: usize, idx: usize| -> Document {
2407 let mut doc = Document::new();
2408 doc.add_text(title, format!("doc_r{}_i{} searchterm", round, idx));
2409 let body_text = format!("round={} idx={} {}", round, idx, "abcdefghij ".repeat(90));
2410 doc.add_text(body, body_text);
2411 doc
2412 };
2413
2414 let mut total_docs = 0usize;
2415
2416 {
2418 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
2419 .await
2420 .unwrap();
2421 for batch in 0..4 {
2422 for i in 0..50 {
2423 writer.add_document(make_doc(1, batch * 50 + i)).unwrap();
2424 }
2425 writer.commit().await.unwrap();
2426 }
2427 total_docs += 200;
2428 }
2429
2430 {
2432 let mut writer = IndexWriter::open(dir.clone(), config.clone())
2433 .await
2434 .unwrap();
2435 writer.force_merge().await.unwrap();
2436 }
2437
2438 {
2440 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
2441 assert_eq!(index.num_docs().await.unwrap(), total_docs as u32);
2442
2443 let reader = index.reader().await.unwrap();
2444 let searcher = reader.searcher().await.unwrap();
2445 assert_eq!(
2446 searcher.num_segments(),
2447 1,
2448 "should be 1 segment after merge 1"
2449 );
2450 let seg = &searcher.segment_readers()[0];
2451 let seg_id = seg.meta().id;
2452
2453 assert_eq!(
2454 seg.store().num_docs(),
2455 seg.num_docs(),
2456 "store.num_docs != meta.num_docs after merge 1"
2457 );
2458
2459 for i in 0..total_docs as u32 {
2460 let doc = searcher
2461 .doc(seg_id, i)
2462 .await
2463 .unwrap_or_else(|e| panic!("doc {} error: {}", i, e));
2464 assert!(doc.is_some(), "doc {} missing after merge 1", i);
2465 let doc = doc.unwrap();
2466 let t = doc
2467 .get_first(title)
2468 .unwrap_or_else(|| panic!("doc {} missing title", i));
2469 assert!(
2470 t.as_text().unwrap().contains("searchterm"),
2471 "doc {} title corrupt after merge 1",
2472 i
2473 );
2474 }
2475 }
2476
2477 {
2479 let mut writer = IndexWriter::open(dir.clone(), config.clone())
2480 .await
2481 .unwrap();
2482 for batch in 0..3 {
2483 for i in 0..50 {
2484 writer.add_document(make_doc(2, batch * 50 + i)).unwrap();
2485 }
2486 writer.commit().await.unwrap();
2487 }
2488 total_docs += 150;
2489 writer.force_merge().await.unwrap();
2490 }
2491
2492 {
2494 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
2495 assert_eq!(index.num_docs().await.unwrap(), total_docs as u32);
2496
2497 let reader = index.reader().await.unwrap();
2498 let searcher = reader.searcher().await.unwrap();
2499 assert_eq!(
2500 searcher.num_segments(),
2501 1,
2502 "should be 1 segment after merge 2"
2503 );
2504 let seg = &searcher.segment_readers()[0];
2505 let seg_id = seg.meta().id;
2506
2507 assert_eq!(
2508 seg.store().num_docs(),
2509 seg.num_docs(),
2510 "store.num_docs != meta.num_docs after merge 2"
2511 );
2512
2513 for i in 0..total_docs as u32 {
2514 let doc = searcher
2515 .doc(seg_id, i)
2516 .await
2517 .unwrap_or_else(|e| panic!("doc {} error: {}", i, e));
2518 assert!(doc.is_some(), "doc {} missing after merge 2", i);
2519 let doc = doc.unwrap();
2520 let t = doc
2521 .get_first(title)
2522 .unwrap_or_else(|| panic!("doc {} missing title", i));
2523 assert!(
2524 t.as_text().unwrap().contains("searchterm"),
2525 "doc {} title corrupt after merge 2",
2526 i
2527 );
2528 }
2529 }
2530
2531 {
2533 let mut writer = IndexWriter::open(dir.clone(), config.clone())
2534 .await
2535 .unwrap();
2536 for batch in 0..2 {
2537 for i in 0..50 {
2538 writer.add_document(make_doc(3, batch * 50 + i)).unwrap();
2539 }
2540 writer.commit().await.unwrap();
2541 }
2542 total_docs += 100;
2543 writer.force_merge().await.unwrap();
2544 }
2545
2546 {
2548 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
2549 assert_eq!(index.num_docs().await.unwrap(), total_docs as u32);
2550
2551 let reader = index.reader().await.unwrap();
2552 let searcher = reader.searcher().await.unwrap();
2553 assert_eq!(
2554 searcher.num_segments(),
2555 1,
2556 "should be 1 segment after merge 3"
2557 );
2558 let seg = &searcher.segment_readers()[0];
2559 let seg_id = seg.meta().id;
2560
2561 assert_eq!(
2562 seg.store().num_docs(),
2563 seg.num_docs(),
2564 "store.num_docs != meta.num_docs after merge 3"
2565 );
2566
2567 let mut missing = 0;
2568 let mut corrupt = 0;
2569 for i in 0..total_docs as u32 {
2570 match searcher.doc(seg_id, i).await {
2571 Ok(Some(doc)) => {
2572 if let Some(t) = doc.get_first(title) {
2573 if !t.as_text().unwrap_or("").contains("searchterm") {
2574 corrupt += 1;
2575 }
2576 } else {
2577 corrupt += 1;
2578 }
2579 }
2580 Ok(None) => missing += 1,
2581 Err(e) => panic!("doc {} error after merge 3: {}", i, e),
2582 }
2583 }
2584 assert_eq!(
2585 missing, 0,
2586 "merge 3: {} of {} docs missing from store",
2587 missing, total_docs
2588 );
2589 assert_eq!(
2590 corrupt, 0,
2591 "merge 3: {} of {} docs have corrupt fields",
2592 corrupt, total_docs
2593 );
2594 }
2595
2596 eprintln!("All {} docs verified across 3 merge rounds", total_docs);
2597 }
2598
2599 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
2602 async fn test_store_large_scale_multi_merge() {
2603 use crate::directories::MmapDirectory;
2604
2605 let tmp_dir = tempfile::tempdir().unwrap();
2606 let dir = MmapDirectory::new(tmp_dir.path());
2607
2608 let mut schema_builder = SchemaBuilder::default();
2609 let title = schema_builder.add_text_field("title", true, true);
2610 let body = schema_builder.add_text_field("body", false, true);
2611 let schema = schema_builder.build();
2612
2613 let config = IndexConfig {
2614 max_indexing_memory_bytes: 1024 * 256,
2615 num_indexing_threads: 2,
2616 merge_policy: Box::new(crate::merge::NoMergePolicy),
2617 ..Default::default()
2618 };
2619
2620 let make_doc = |round: usize, idx: usize| -> Document {
2622 let mut doc = Document::new();
2623 doc.add_text(title, format!("r{}_i{}_needle", round, idx));
2624 doc.add_text(body, format!("r{}i{} {}", round, idx, "x".repeat(950)));
2625 doc
2626 };
2627
2628 let mut total_docs = 0u32;
2629
2630 for round in 0..4 {
2631 let docs_this_round = 800u32;
2632
2633 {
2635 let mut writer = if round == 0 {
2636 IndexWriter::create(dir.clone(), schema.clone(), config.clone())
2637 .await
2638 .unwrap()
2639 } else {
2640 IndexWriter::open(dir.clone(), config.clone())
2641 .await
2642 .unwrap()
2643 };
2644 for batch in 0..4 {
2645 for i in 0..docs_this_round / 4 {
2646 writer
2647 .add_document(make_doc(
2648 round,
2649 (batch * (docs_this_round / 4) + i) as usize,
2650 ))
2651 .unwrap();
2652 }
2653 writer.commit().await.unwrap();
2654 }
2655 total_docs += docs_this_round;
2656 writer.force_merge().await.unwrap();
2657 }
2658
2659 {
2661 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
2662 assert_eq!(index.num_docs().await.unwrap(), total_docs);
2663 let reader = index.reader().await.unwrap();
2664 let searcher = reader.searcher().await.unwrap();
2665 assert_eq!(
2666 searcher.num_segments(),
2667 1,
2668 "round {}: expected 1 segment",
2669 round
2670 );
2671 let seg = &searcher.segment_readers()[0];
2672 let seg_id = seg.meta().id;
2673 assert_eq!(
2674 seg.store().num_docs(),
2675 seg.num_docs(),
2676 "round {}: store/meta mismatch",
2677 round
2678 );
2679 let mut missing = 0u32;
2680 for i in 0..total_docs {
2681 match searcher.doc(seg_id, i).await {
2682 Ok(Some(doc)) => {
2683 let t = doc.get_first(title);
2684 assert!(
2685 t.is_some() && t.unwrap().as_text().unwrap().contains("needle"),
2686 "round {}: doc {} corrupt",
2687 round,
2688 i
2689 );
2690 }
2691 Ok(None) => missing += 1,
2692 Err(e) => panic!("round {}: doc {} error: {}", round, i, e),
2693 }
2694 }
2695 assert_eq!(
2696 missing, 0,
2697 "round {}: {} of {} docs missing",
2698 round, missing, total_docs
2699 );
2700 }
2701
2702 eprintln!(
2703 "Round {}: {} docs verified ({} total)",
2704 round, docs_this_round, total_docs
2705 );
2706 }
2707 eprintln!(
2708 "All {} docs verified across 4 large-scale merge rounds",
2709 total_docs
2710 );
2711 }
2712}