1#[cfg(feature = "native")]
11use crate::dsl::Schema;
12#[cfg(feature = "native")]
13use crate::error::Result;
14#[cfg(feature = "native")]
15use std::sync::Arc;
16
17mod searcher;
18pub use searcher::Searcher;
19
20#[cfg(feature = "native")]
21mod reader;
22#[cfg(feature = "native")]
23mod vector_builder;
24#[cfg(feature = "native")]
25mod writer;
26#[cfg(feature = "native")]
27pub use reader::IndexReader;
28#[cfg(feature = "native")]
29pub use writer::{IndexWriter, PreparedCommit};
30
31mod metadata;
32pub use metadata::{FieldVectorMeta, INDEX_META_FILENAME, IndexMetadata, VectorIndexState};
33
34#[cfg(feature = "native")]
35mod helpers;
36#[cfg(feature = "native")]
37pub use helpers::{
38 IndexingStats, SchemaConfig, SchemaFieldConfig, create_index_at_path, create_index_from_sdl,
39 index_documents_from_reader, index_json_document, parse_schema,
40};
41
42pub const SLICE_CACHE_FILENAME: &str = "index.slicecache";
44
45#[derive(Debug, Clone)]
47pub struct IndexConfig {
48 pub num_threads: usize,
50 pub num_indexing_threads: usize,
52 pub num_compression_threads: usize,
54 pub term_cache_blocks: usize,
56 pub store_cache_blocks: usize,
58 pub max_indexing_memory_bytes: usize,
60 pub merge_policy: Box<dyn crate::merge::MergePolicy>,
62 pub optimization: crate::structures::IndexOptimization,
64 pub reload_interval_ms: u64,
66 pub max_concurrent_merges: usize,
68}
69
70impl Default for IndexConfig {
71 fn default() -> Self {
72 #[cfg(feature = "native")]
73 let indexing_threads = crate::default_indexing_threads();
74 #[cfg(not(feature = "native"))]
75 let indexing_threads = 1;
76
77 #[cfg(feature = "native")]
78 let compression_threads = crate::default_compression_threads();
79 #[cfg(not(feature = "native"))]
80 let compression_threads = 1;
81
82 Self {
83 num_threads: indexing_threads,
84 num_indexing_threads: 1,
85 num_compression_threads: compression_threads,
86 term_cache_blocks: 256,
87 store_cache_blocks: 32,
88 max_indexing_memory_bytes: 256 * 1024 * 1024, merge_policy: Box::new(crate::merge::TieredMergePolicy::default()),
90 optimization: crate::structures::IndexOptimization::default(),
91 reload_interval_ms: 1000, max_concurrent_merges: 4,
93 }
94 }
95}
96
97#[cfg(feature = "native")]
106pub struct Index<D: crate::directories::DirectoryWriter + 'static> {
107 directory: Arc<D>,
108 schema: Arc<Schema>,
109 config: IndexConfig,
110 segment_manager: Arc<crate::merge::SegmentManager<D>>,
112 cached_reader: tokio::sync::OnceCell<IndexReader<D>>,
114}
115
116#[cfg(feature = "native")]
117impl<D: crate::directories::DirectoryWriter + 'static> Index<D> {
118 pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
120 let directory = Arc::new(directory);
121 let schema = Arc::new(schema);
122 let metadata = IndexMetadata::new((*schema).clone());
123
124 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
125 Arc::clone(&directory),
126 Arc::clone(&schema),
127 metadata,
128 config.merge_policy.clone_box(),
129 config.term_cache_blocks,
130 config.max_concurrent_merges,
131 ));
132
133 segment_manager.update_metadata(|_| {}).await?;
135
136 Ok(Self {
137 directory,
138 schema,
139 config,
140 segment_manager,
141 cached_reader: tokio::sync::OnceCell::new(),
142 })
143 }
144
145 pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
147 let directory = Arc::new(directory);
148
149 let metadata = IndexMetadata::load(directory.as_ref()).await?;
151 let schema = Arc::new(metadata.schema.clone());
152
153 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
154 Arc::clone(&directory),
155 Arc::clone(&schema),
156 metadata,
157 config.merge_policy.clone_box(),
158 config.term_cache_blocks,
159 config.max_concurrent_merges,
160 ));
161
162 segment_manager.load_and_publish_trained().await;
164
165 Ok(Self {
166 directory,
167 schema,
168 config,
169 segment_manager,
170 cached_reader: tokio::sync::OnceCell::new(),
171 })
172 }
173
174 pub fn schema(&self) -> &Schema {
176 &self.schema
177 }
178
179 pub fn directory(&self) -> &D {
181 &self.directory
182 }
183
184 pub fn segment_manager(&self) -> &Arc<crate::merge::SegmentManager<D>> {
186 &self.segment_manager
187 }
188
189 pub async fn reader(&self) -> Result<&IndexReader<D>> {
194 self.cached_reader
195 .get_or_try_init(|| async {
196 IndexReader::from_segment_manager(
197 Arc::clone(&self.schema),
198 Arc::clone(&self.segment_manager),
199 self.config.term_cache_blocks,
200 self.config.reload_interval_ms,
201 )
202 .await
203 })
204 .await
205 }
206
207 pub fn config(&self) -> &IndexConfig {
209 &self.config
210 }
211
212 pub async fn segment_readers(&self) -> Result<Vec<Arc<crate::segment::SegmentReader>>> {
214 let reader = self.reader().await?;
215 let searcher = reader.searcher().await?;
216 Ok(searcher.segment_readers().to_vec())
217 }
218
219 pub async fn num_docs(&self) -> Result<u32> {
221 let reader = self.reader().await?;
222 let searcher = reader.searcher().await?;
223 Ok(searcher.num_docs())
224 }
225
226 pub fn default_fields(&self) -> Vec<crate::Field> {
228 if !self.schema.default_fields().is_empty() {
229 self.schema.default_fields().to_vec()
230 } else {
231 self.schema
232 .fields()
233 .filter(|(_, entry)| {
234 entry.indexed && entry.field_type == crate::dsl::FieldType::Text
235 })
236 .map(|(field, _)| field)
237 .collect()
238 }
239 }
240
241 pub fn tokenizers(&self) -> Arc<crate::tokenizer::TokenizerRegistry> {
243 Arc::new(crate::tokenizer::TokenizerRegistry::default())
244 }
245
246 pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
248 let default_fields = self.default_fields();
249 let tokenizers = self.tokenizers();
250
251 let query_routers = self.schema.query_routers();
252 if !query_routers.is_empty()
253 && let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers)
254 {
255 return crate::dsl::QueryLanguageParser::with_router(
256 Arc::clone(&self.schema),
257 default_fields,
258 tokenizers,
259 router,
260 );
261 }
262
263 crate::dsl::QueryLanguageParser::new(Arc::clone(&self.schema), default_fields, tokenizers)
264 }
265
266 pub async fn query(
268 &self,
269 query_str: &str,
270 limit: usize,
271 ) -> Result<crate::query::SearchResponse> {
272 self.query_offset(query_str, limit, 0).await
273 }
274
275 pub async fn query_offset(
277 &self,
278 query_str: &str,
279 limit: usize,
280 offset: usize,
281 ) -> Result<crate::query::SearchResponse> {
282 let parser = self.query_parser();
283 let query = parser
284 .parse(query_str)
285 .map_err(crate::error::Error::Query)?;
286 self.search_offset(query.as_ref(), limit, offset).await
287 }
288
289 pub async fn search(
291 &self,
292 query: &dyn crate::query::Query,
293 limit: usize,
294 ) -> Result<crate::query::SearchResponse> {
295 self.search_offset(query, limit, 0).await
296 }
297
298 pub async fn search_offset(
300 &self,
301 query: &dyn crate::query::Query,
302 limit: usize,
303 offset: usize,
304 ) -> Result<crate::query::SearchResponse> {
305 let reader = self.reader().await?;
306 let searcher = reader.searcher().await?;
307 let segments = searcher.segment_readers();
308
309 let fetch_limit = offset + limit;
310
311 let futures: Vec<_> = segments
312 .iter()
313 .map(|segment| {
314 let sid = segment.meta().id;
315 async move {
316 let results =
317 crate::query::search_segment(segment.as_ref(), query, fetch_limit).await?;
318 Ok::<_, crate::error::Error>(
319 results
320 .into_iter()
321 .map(move |r| (sid, r))
322 .collect::<Vec<_>>(),
323 )
324 }
325 })
326 .collect();
327
328 let batches = futures::future::try_join_all(futures).await?;
329 let mut all_results: Vec<(u128, crate::query::SearchResult)> =
330 Vec::with_capacity(batches.iter().map(|b| b.len()).sum());
331 for batch in batches {
332 all_results.extend(batch);
333 }
334
335 all_results.sort_by(|a, b| {
336 b.1.score
337 .partial_cmp(&a.1.score)
338 .unwrap_or(std::cmp::Ordering::Equal)
339 });
340
341 let total_hits = all_results.len() as u32;
342
343 let hits: Vec<crate::query::SearchHit> = all_results
344 .into_iter()
345 .skip(offset)
346 .take(limit)
347 .map(|(segment_id, result)| crate::query::SearchHit {
348 address: crate::query::DocAddress::new(segment_id, result.doc_id),
349 score: result.score,
350 matched_fields: result.extract_ordinals(),
351 })
352 .collect();
353
354 Ok(crate::query::SearchResponse { hits, total_hits })
355 }
356
357 pub async fn get_document(
359 &self,
360 address: &crate::query::DocAddress,
361 ) -> Result<Option<crate::dsl::Document>> {
362 let reader = self.reader().await?;
363 let searcher = reader.searcher().await?;
364 searcher.get_document(address).await
365 }
366
367 pub async fn get_postings(
369 &self,
370 field: crate::Field,
371 term: &[u8],
372 ) -> Result<
373 Vec<(
374 Arc<crate::segment::SegmentReader>,
375 crate::structures::BlockPostingList,
376 )>,
377 > {
378 let segments = self.segment_readers().await?;
379 let mut results = Vec::new();
380
381 for segment in segments {
382 if let Some(postings) = segment.get_postings(field, term).await? {
383 results.push((segment, postings));
384 }
385 }
386
387 Ok(results)
388 }
389}
390
391#[cfg(feature = "native")]
393impl<D: crate::directories::DirectoryWriter + 'static> Index<D> {
394 pub fn writer(&self) -> writer::IndexWriter<D> {
396 writer::IndexWriter::from_index(self)
397 }
398}
399
400#[cfg(test)]
401mod tests {
402 use super::*;
403 use crate::directories::RamDirectory;
404 use crate::dsl::{Document, SchemaBuilder};
405
406 #[tokio::test]
407 async fn test_index_create_and_search() {
408 let mut schema_builder = SchemaBuilder::default();
409 let title = schema_builder.add_text_field("title", true, true);
410 let body = schema_builder.add_text_field("body", true, true);
411 let schema = schema_builder.build();
412
413 let dir = RamDirectory::new();
414 let config = IndexConfig::default();
415
416 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
418 .await
419 .unwrap();
420
421 let mut doc1 = Document::new();
422 doc1.add_text(title, "Hello World");
423 doc1.add_text(body, "This is the first document");
424 writer.add_document(doc1).unwrap();
425
426 let mut doc2 = Document::new();
427 doc2.add_text(title, "Goodbye World");
428 doc2.add_text(body, "This is the second document");
429 writer.add_document(doc2).unwrap();
430
431 writer.commit().await.unwrap();
432
433 let index = Index::open(dir, config).await.unwrap();
435 assert_eq!(index.num_docs().await.unwrap(), 2);
436
437 let postings = index.get_postings(title, b"world").await.unwrap();
439 assert_eq!(postings.len(), 1); assert_eq!(postings[0].1.doc_count(), 2); let reader = index.reader().await.unwrap();
444 let searcher = reader.searcher().await.unwrap();
445 let doc = searcher.doc(0).await.unwrap().unwrap();
446 assert_eq!(doc.get_first(title).unwrap().as_text(), Some("Hello World"));
447 }
448
449 #[tokio::test]
450 async fn test_multiple_segments() {
451 let mut schema_builder = SchemaBuilder::default();
452 let title = schema_builder.add_text_field("title", true, true);
453 let schema = schema_builder.build();
454
455 let dir = RamDirectory::new();
456 let config = IndexConfig {
457 max_indexing_memory_bytes: 1024, ..Default::default()
459 };
460
461 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
462 .await
463 .unwrap();
464
465 for batch in 0..3 {
467 for i in 0..5 {
468 let mut doc = Document::new();
469 doc.add_text(title, format!("Document {} batch {}", i, batch));
470 writer.add_document(doc).unwrap();
471 }
472 writer.commit().await.unwrap();
473 }
474
475 let index = Index::open(dir, config).await.unwrap();
477 assert_eq!(index.num_docs().await.unwrap(), 15);
478 assert!(
480 index.segment_readers().await.unwrap().len() >= 2,
481 "Expected multiple segments"
482 );
483 }
484
485 #[tokio::test]
486 async fn test_segment_merge() {
487 let mut schema_builder = SchemaBuilder::default();
488 let title = schema_builder.add_text_field("title", true, true);
489 let schema = schema_builder.build();
490
491 let dir = RamDirectory::new();
492 let config = IndexConfig {
493 max_indexing_memory_bytes: 512, ..Default::default()
495 };
496
497 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
498 .await
499 .unwrap();
500
501 for batch in 0..3 {
503 for i in 0..3 {
504 let mut doc = Document::new();
505 doc.add_text(title, format!("Document {} batch {}", i, batch));
506 writer.add_document(doc).unwrap();
507 }
508 writer.commit().await.unwrap();
509 }
510
511 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
513 assert!(
514 index.segment_readers().await.unwrap().len() >= 2,
515 "Expected multiple segments"
516 );
517
518 let mut writer = IndexWriter::open(dir.clone(), config.clone())
520 .await
521 .unwrap();
522 writer.force_merge().await.unwrap();
523
524 let index = Index::open(dir, config).await.unwrap();
526 assert_eq!(index.segment_readers().await.unwrap().len(), 1);
527 assert_eq!(index.num_docs().await.unwrap(), 9);
528
529 let reader = index.reader().await.unwrap();
531 let searcher = reader.searcher().await.unwrap();
532 let mut found_docs = 0;
533 for i in 0..9 {
534 if searcher.doc(i).await.unwrap().is_some() {
535 found_docs += 1;
536 }
537 }
538 assert_eq!(found_docs, 9);
539 }
540
541 #[tokio::test]
542 async fn test_match_query() {
543 let mut schema_builder = SchemaBuilder::default();
544 let title = schema_builder.add_text_field("title", true, true);
545 let body = schema_builder.add_text_field("body", true, true);
546 let schema = schema_builder.build();
547
548 let dir = RamDirectory::new();
549 let config = IndexConfig::default();
550
551 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
552 .await
553 .unwrap();
554
555 let mut doc1 = Document::new();
556 doc1.add_text(title, "rust programming");
557 doc1.add_text(body, "Learn rust language");
558 writer.add_document(doc1).unwrap();
559
560 let mut doc2 = Document::new();
561 doc2.add_text(title, "python programming");
562 doc2.add_text(body, "Learn python language");
563 writer.add_document(doc2).unwrap();
564
565 writer.commit().await.unwrap();
566
567 let index = Index::open(dir, config).await.unwrap();
568
569 let results = index.query("rust", 10).await.unwrap();
571 assert_eq!(results.hits.len(), 1);
572
573 let results = index.query("rust programming", 10).await.unwrap();
575 assert!(!results.hits.is_empty());
576
577 let hit = &results.hits[0];
579 assert!(!hit.address.segment_id.is_empty(), "Should have segment_id");
580
581 let doc = index.get_document(&hit.address).await.unwrap().unwrap();
583 assert!(
584 !doc.field_values().is_empty(),
585 "Doc should have field values"
586 );
587
588 let reader = index.reader().await.unwrap();
590 let searcher = reader.searcher().await.unwrap();
591 let doc = searcher.doc(0).await.unwrap().unwrap();
592 assert!(
593 !doc.field_values().is_empty(),
594 "Doc should have field values"
595 );
596 }
597
598 #[tokio::test]
599 async fn test_slice_cache_warmup_and_load() {
600 use crate::directories::SliceCachingDirectory;
601
602 let mut schema_builder = SchemaBuilder::default();
603 let title = schema_builder.add_text_field("title", true, true);
604 let body = schema_builder.add_text_field("body", true, true);
605 let schema = schema_builder.build();
606
607 let dir = RamDirectory::new();
608 let config = IndexConfig::default();
609
610 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
612 .await
613 .unwrap();
614
615 for i in 0..10 {
616 let mut doc = Document::new();
617 doc.add_text(title, format!("Document {} about rust", i));
618 doc.add_text(body, format!("This is body text number {}", i));
619 writer.add_document(doc).unwrap();
620 }
621 writer.commit().await.unwrap();
622
623 let caching_dir = SliceCachingDirectory::new(dir.clone(), 1024 * 1024);
625 let index = Index::open(caching_dir, config.clone()).await.unwrap();
626
627 let results = index.query("rust", 10).await.unwrap();
629 assert!(!results.hits.is_empty());
630
631 let stats = index.directory.stats();
633 assert!(stats.total_bytes > 0, "Cache should have data after search");
634 }
635
636 #[tokio::test]
637 async fn test_multivalue_field_indexing_and_search() {
638 let mut schema_builder = SchemaBuilder::default();
639 let uris = schema_builder.add_text_field("uris", true, true);
640 let title = schema_builder.add_text_field("title", true, true);
641 let schema = schema_builder.build();
642
643 let dir = RamDirectory::new();
644 let config = IndexConfig::default();
645
646 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
648 .await
649 .unwrap();
650
651 let mut doc = Document::new();
652 doc.add_text(uris, "one");
653 doc.add_text(uris, "two");
654 doc.add_text(title, "Test Document");
655 writer.add_document(doc).unwrap();
656
657 let mut doc2 = Document::new();
659 doc2.add_text(uris, "three");
660 doc2.add_text(title, "Another Document");
661 writer.add_document(doc2).unwrap();
662
663 writer.commit().await.unwrap();
664
665 let index = Index::open(dir, config).await.unwrap();
667 assert_eq!(index.num_docs().await.unwrap(), 2);
668
669 let reader = index.reader().await.unwrap();
671 let searcher = reader.searcher().await.unwrap();
672 let doc = searcher.doc(0).await.unwrap().unwrap();
673 let all_uris: Vec<_> = doc.get_all(uris).collect();
674 assert_eq!(all_uris.len(), 2, "Should have 2 uris values");
675 assert_eq!(all_uris[0].as_text(), Some("one"));
676 assert_eq!(all_uris[1].as_text(), Some("two"));
677
678 let json = doc.to_json(index.schema());
680 let uris_json = json.get("uris").unwrap();
681 assert!(uris_json.is_array(), "Multi-value field should be an array");
682 let uris_arr = uris_json.as_array().unwrap();
683 assert_eq!(uris_arr.len(), 2);
684 assert_eq!(uris_arr[0].as_str(), Some("one"));
685 assert_eq!(uris_arr[1].as_str(), Some("two"));
686
687 let results = index.query("uris:one", 10).await.unwrap();
689 assert_eq!(results.hits.len(), 1, "Should find doc with 'one'");
690 assert_eq!(results.hits[0].address.doc_id, 0);
691
692 let results = index.query("uris:two", 10).await.unwrap();
693 assert_eq!(results.hits.len(), 1, "Should find doc with 'two'");
694 assert_eq!(results.hits[0].address.doc_id, 0);
695
696 let results = index.query("uris:three", 10).await.unwrap();
697 assert_eq!(results.hits.len(), 1, "Should find doc with 'three'");
698 assert_eq!(results.hits[0].address.doc_id, 1);
699
700 let results = index.query("uris:nonexistent", 10).await.unwrap();
702 assert_eq!(results.hits.len(), 0, "Should not find non-existent value");
703 }
704
705 #[tokio::test]
712 async fn test_wand_optimization_for_or_queries() {
713 use crate::query::{BooleanQuery, TermQuery};
714
715 let mut schema_builder = SchemaBuilder::default();
716 let content = schema_builder.add_text_field("content", true, true);
717 let schema = schema_builder.build();
718
719 let dir = RamDirectory::new();
720 let config = IndexConfig::default();
721
722 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
724 .await
725 .unwrap();
726
727 let mut doc = Document::new();
729 doc.add_text(content, "rust programming language is fast");
730 writer.add_document(doc).unwrap();
731
732 let mut doc = Document::new();
734 doc.add_text(content, "rust is a systems language");
735 writer.add_document(doc).unwrap();
736
737 let mut doc = Document::new();
739 doc.add_text(content, "programming is fun");
740 writer.add_document(doc).unwrap();
741
742 let mut doc = Document::new();
744 doc.add_text(content, "python is easy to learn");
745 writer.add_document(doc).unwrap();
746
747 let mut doc = Document::new();
749 doc.add_text(content, "rust rust programming programming systems");
750 writer.add_document(doc).unwrap();
751
752 writer.commit().await.unwrap();
753
754 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
756
757 let or_query = BooleanQuery::new()
759 .should(TermQuery::text(content, "rust"))
760 .should(TermQuery::text(content, "programming"));
761
762 let results = index.search(&or_query, 10).await.unwrap();
763
764 assert_eq!(results.hits.len(), 4, "Should find exactly 4 documents");
766
767 let doc_ids: Vec<u32> = results.hits.iter().map(|h| h.address.doc_id).collect();
768 assert!(doc_ids.contains(&0), "Should find doc 0");
769 assert!(doc_ids.contains(&1), "Should find doc 1");
770 assert!(doc_ids.contains(&2), "Should find doc 2");
771 assert!(doc_ids.contains(&4), "Should find doc 4");
772 assert!(
773 !doc_ids.contains(&3),
774 "Should NOT find doc 3 (only has 'python')"
775 );
776
777 let single_query = BooleanQuery::new().should(TermQuery::text(content, "rust"));
779
780 let results = index.search(&single_query, 10).await.unwrap();
781 assert_eq!(results.hits.len(), 3, "Should find 3 documents with 'rust'");
782
783 let must_query = BooleanQuery::new()
785 .must(TermQuery::text(content, "rust"))
786 .should(TermQuery::text(content, "programming"));
787
788 let results = index.search(&must_query, 10).await.unwrap();
789 assert_eq!(results.hits.len(), 3, "Should find 3 documents with 'rust'");
791
792 let must_not_query = BooleanQuery::new()
794 .should(TermQuery::text(content, "rust"))
795 .should(TermQuery::text(content, "programming"))
796 .must_not(TermQuery::text(content, "systems"));
797
798 let results = index.search(&must_not_query, 10).await.unwrap();
799 let doc_ids: Vec<u32> = results.hits.iter().map(|h| h.address.doc_id).collect();
801 assert!(
802 !doc_ids.contains(&1),
803 "Should NOT find doc 1 (has 'systems')"
804 );
805 assert!(
806 !doc_ids.contains(&4),
807 "Should NOT find doc 4 (has 'systems')"
808 );
809
810 let or_query = BooleanQuery::new()
812 .should(TermQuery::text(content, "rust"))
813 .should(TermQuery::text(content, "programming"));
814
815 let results = index.search(&or_query, 2).await.unwrap();
816 assert_eq!(results.hits.len(), 2, "Should return only top 2 results");
817
818 }
821
822 #[tokio::test]
824 async fn test_wand_results_match_standard_boolean() {
825 use crate::query::{BooleanQuery, TermQuery, WandOrQuery};
826
827 let mut schema_builder = SchemaBuilder::default();
828 let content = schema_builder.add_text_field("content", true, true);
829 let schema = schema_builder.build();
830
831 let dir = RamDirectory::new();
832 let config = IndexConfig::default();
833
834 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
835 .await
836 .unwrap();
837
838 for i in 0..10 {
840 let mut doc = Document::new();
841 let text = match i % 4 {
842 0 => "apple banana cherry",
843 1 => "apple orange",
844 2 => "banana grape",
845 _ => "cherry date",
846 };
847 doc.add_text(content, text);
848 writer.add_document(doc).unwrap();
849 }
850
851 writer.commit().await.unwrap();
852 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
853
854 let wand_query = WandOrQuery::new(content).term("apple").term("banana");
856
857 let bool_query = BooleanQuery::new()
858 .should(TermQuery::text(content, "apple"))
859 .should(TermQuery::text(content, "banana"));
860
861 let wand_results = index.search(&wand_query, 10).await.unwrap();
862 let bool_results = index.search(&bool_query, 10).await.unwrap();
863
864 assert_eq!(
866 wand_results.hits.len(),
867 bool_results.hits.len(),
868 "WAND and Boolean should find same number of docs"
869 );
870
871 let wand_docs: std::collections::HashSet<u32> =
872 wand_results.hits.iter().map(|h| h.address.doc_id).collect();
873 let bool_docs: std::collections::HashSet<u32> =
874 bool_results.hits.iter().map(|h| h.address.doc_id).collect();
875
876 assert_eq!(
877 wand_docs, bool_docs,
878 "WAND and Boolean should find same documents"
879 );
880 }
881
882 #[tokio::test]
883 async fn test_vector_index_threshold_switch() {
884 use crate::dsl::{DenseVectorConfig, DenseVectorQuantization, VectorIndexType};
885
886 let mut schema_builder = SchemaBuilder::default();
888 let title = schema_builder.add_text_field("title", true, true);
889 let embedding = schema_builder.add_dense_vector_field_with_config(
890 "embedding",
891 true, true, DenseVectorConfig {
894 dim: 8,
895 index_type: VectorIndexType::IvfRaBitQ,
896 quantization: DenseVectorQuantization::F32,
897 num_clusters: Some(4), nprobe: 2,
899 build_threshold: Some(50), },
901 );
902 let schema = schema_builder.build();
903
904 let dir = RamDirectory::new();
905 let config = IndexConfig::default();
906
907 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
909 .await
910 .unwrap();
911
912 for i in 0..30 {
914 let mut doc = Document::new();
915 doc.add_text(title, format!("Document {}", i));
916 let vec: Vec<f32> = (0..8).map(|_| (i as f32) / 30.0).collect();
918 doc.add_dense_vector(embedding, vec);
919 writer.add_document(doc).unwrap();
920 }
921 writer.commit().await.unwrap();
922
923 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
925 assert!(
926 index.segment_manager.trained().is_none(),
927 "Should not have trained centroids below threshold"
928 );
929
930 let query_vec: Vec<f32> = vec![0.5; 8];
932 let segments = index.segment_readers().await.unwrap();
933 assert!(!segments.is_empty());
934
935 let results = segments[0]
936 .search_dense_vector(
937 embedding,
938 &query_vec,
939 5,
940 0,
941 1,
942 crate::query::MultiValueCombiner::Max,
943 )
944 .await
945 .unwrap();
946 assert!(!results.is_empty(), "Flat search should return results");
947
948 let mut writer = IndexWriter::open(dir.clone(), config.clone())
950 .await
951 .unwrap();
952
953 for i in 30..60 {
955 let mut doc = Document::new();
956 doc.add_text(title, format!("Document {}", i));
957 let vec: Vec<f32> = (0..8).map(|_| (i as f32) / 60.0).collect();
958 doc.add_dense_vector(embedding, vec);
959 writer.add_document(doc).unwrap();
960 }
961 writer.commit().await.unwrap();
962
963 writer.build_vector_index().await.unwrap();
965
966 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
968 assert!(
969 index.segment_manager.trained().is_some(),
970 "Should have loaded trained centroids for embedding field"
971 );
972
973 let segments = index.segment_readers().await.unwrap();
975 let results = segments[0]
976 .search_dense_vector(
977 embedding,
978 &query_vec,
979 5,
980 0,
981 1,
982 crate::query::MultiValueCombiner::Max,
983 )
984 .await
985 .unwrap();
986 assert!(
987 !results.is_empty(),
988 "Search should return results after build"
989 );
990
991 let writer = IndexWriter::open(dir.clone(), config.clone())
993 .await
994 .unwrap();
995 writer.build_vector_index().await.unwrap(); assert!(writer.segment_manager.trained().is_some());
999 }
1000
1001 #[tokio::test]
1004 async fn test_multi_round_merge_with_search() {
1005 let mut schema_builder = SchemaBuilder::default();
1006 let title = schema_builder.add_text_field("title", true, true);
1007 let body = schema_builder.add_text_field("body", true, true);
1008 let schema = schema_builder.build();
1009
1010 let dir = RamDirectory::new();
1011 let config = IndexConfig {
1012 max_indexing_memory_bytes: 512,
1013 ..Default::default()
1014 };
1015
1016 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1018 .await
1019 .unwrap();
1020
1021 for batch in 0..5 {
1022 for i in 0..10 {
1023 let mut doc = Document::new();
1024 doc.add_text(
1025 title,
1026 format!("alpha bravo charlie batch{} doc{}", batch, i),
1027 );
1028 doc.add_text(
1029 body,
1030 format!("the quick brown fox jumps over the lazy dog number {}", i),
1031 );
1032 writer.add_document(doc).unwrap();
1033 }
1034 writer.commit().await.unwrap();
1035 }
1036
1037 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1038 let pre_merge_segments = index.segment_readers().await.unwrap().len();
1039 assert!(
1040 pre_merge_segments >= 3,
1041 "Expected >=3 segments, got {}",
1042 pre_merge_segments
1043 );
1044 assert_eq!(index.num_docs().await.unwrap(), 50);
1045
1046 let results = index.query("alpha", 100).await.unwrap();
1048 assert_eq!(results.hits.len(), 50, "all 50 docs should match 'alpha'");
1049
1050 let results = index.query("fox", 100).await.unwrap();
1051 assert_eq!(results.hits.len(), 50, "all 50 docs should match 'fox'");
1052
1053 let mut writer = IndexWriter::open(dir.clone(), config.clone())
1055 .await
1056 .unwrap();
1057 writer.force_merge().await.unwrap();
1058
1059 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1060 assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1061 assert_eq!(index.num_docs().await.unwrap(), 50);
1062
1063 let results = index.query("alpha", 100).await.unwrap();
1065 assert_eq!(
1066 results.hits.len(),
1067 50,
1068 "all 50 docs should match 'alpha' after merge 1"
1069 );
1070
1071 let results = index.query("fox", 100).await.unwrap();
1072 assert_eq!(
1073 results.hits.len(),
1074 50,
1075 "all 50 docs should match 'fox' after merge 1"
1076 );
1077
1078 let reader1 = index.reader().await.unwrap();
1080 let searcher1 = reader1.searcher().await.unwrap();
1081 for i in 0..50 {
1082 let doc = searcher1.doc(i).await.unwrap();
1083 assert!(doc.is_some(), "doc {} should exist after merge 1", i);
1084 }
1085
1086 let mut writer = IndexWriter::open(dir.clone(), config.clone())
1088 .await
1089 .unwrap();
1090 for batch in 0..3 {
1091 for i in 0..10 {
1092 let mut doc = Document::new();
1093 doc.add_text(
1094 title,
1095 format!("delta echo foxtrot round2_batch{} doc{}", batch, i),
1096 );
1097 doc.add_text(
1098 body,
1099 format!("the quick brown fox jumps again number {}", i),
1100 );
1101 writer.add_document(doc).unwrap();
1102 }
1103 writer.commit().await.unwrap();
1104 }
1105
1106 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1107 assert_eq!(index.num_docs().await.unwrap(), 80);
1108 assert!(
1109 index.segment_readers().await.unwrap().len() >= 2,
1110 "Should have >=2 segments after round 2 ingestion"
1111 );
1112
1113 let results = index.query("fox", 100).await.unwrap();
1115 assert_eq!(results.hits.len(), 80, "all 80 docs should match 'fox'");
1116
1117 let results = index.query("alpha", 100).await.unwrap();
1118 assert_eq!(results.hits.len(), 50, "only round 1 docs match 'alpha'");
1119
1120 let results = index.query("delta", 100).await.unwrap();
1121 assert_eq!(results.hits.len(), 30, "only round 2 docs match 'delta'");
1122
1123 let mut writer = IndexWriter::open(dir.clone(), config.clone())
1125 .await
1126 .unwrap();
1127 writer.force_merge().await.unwrap();
1128
1129 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1130 assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1131 assert_eq!(index.num_docs().await.unwrap(), 80);
1132
1133 let results = index.query("fox", 100).await.unwrap();
1135 assert_eq!(results.hits.len(), 80, "all 80 docs after merge 2");
1136
1137 let results = index.query("alpha", 100).await.unwrap();
1138 assert_eq!(results.hits.len(), 50, "round 1 docs after merge 2");
1139
1140 let results = index.query("delta", 100).await.unwrap();
1141 assert_eq!(results.hits.len(), 30, "round 2 docs after merge 2");
1142
1143 let reader2 = index.reader().await.unwrap();
1145 let searcher2 = reader2.searcher().await.unwrap();
1146 for i in 0..80 {
1147 let doc = searcher2.doc(i).await.unwrap();
1148 assert!(doc.is_some(), "doc {} should exist after merge 2", i);
1149 }
1150 }
1151
1152 #[tokio::test]
1155 async fn test_large_scale_merge_correctness() {
1156 let mut schema_builder = SchemaBuilder::default();
1157 let title = schema_builder.add_text_field("title", true, true);
1158 let schema = schema_builder.build();
1159
1160 let dir = RamDirectory::new();
1161 let config = IndexConfig {
1162 max_indexing_memory_bytes: 512,
1163 ..Default::default()
1164 };
1165
1166 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1167 .await
1168 .unwrap();
1169
1170 let total_docs = 200u32;
1173 for batch in 0..8 {
1174 for i in 0..25 {
1175 let mut doc = Document::new();
1176 doc.add_text(
1177 title,
1178 format!("common shared term unique_{} item{}", batch, i),
1179 );
1180 writer.add_document(doc).unwrap();
1181 }
1182 writer.commit().await.unwrap();
1183 }
1184
1185 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1187 assert_eq!(index.num_docs().await.unwrap(), total_docs);
1188
1189 let results = index.query("common", 300).await.unwrap();
1190 assert_eq!(
1191 results.hits.len(),
1192 total_docs as usize,
1193 "all docs should match 'common'"
1194 );
1195
1196 for batch in 0..8 {
1198 let q = format!("unique_{}", batch);
1199 let results = index.query(&q, 100).await.unwrap();
1200 assert_eq!(results.hits.len(), 25, "'{}' should match 25 docs", q);
1201 }
1202
1203 let mut writer = IndexWriter::open(dir.clone(), config.clone())
1205 .await
1206 .unwrap();
1207 writer.force_merge().await.unwrap();
1208
1209 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1211 assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1212 assert_eq!(index.num_docs().await.unwrap(), total_docs);
1213
1214 let results = index.query("common", 300).await.unwrap();
1215 assert_eq!(results.hits.len(), total_docs as usize);
1216
1217 for batch in 0..8 {
1218 let q = format!("unique_{}", batch);
1219 let results = index.query(&q, 100).await.unwrap();
1220 assert_eq!(results.hits.len(), 25, "'{}' after merge", q);
1221 }
1222
1223 let reader = index.reader().await.unwrap();
1225 let searcher = reader.searcher().await.unwrap();
1226 for i in 0..total_docs {
1227 let doc = searcher.doc(i).await.unwrap();
1228 assert!(doc.is_some(), "doc {} missing after merge", i);
1229 }
1230 }
1231
1232 #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
1236 async fn test_auto_merge_triggered() {
1237 use crate::directories::MmapDirectory;
1238 let tmp_dir = tempfile::tempdir().unwrap();
1239 let dir = MmapDirectory::new(tmp_dir.path());
1240
1241 let mut schema_builder = SchemaBuilder::default();
1242 let title = schema_builder.add_text_field("title", true, true);
1243 let body = schema_builder.add_text_field("body", true, true);
1244 let schema = schema_builder.build();
1245
1246 let config = IndexConfig {
1248 max_indexing_memory_bytes: 4096,
1249 num_indexing_threads: 4,
1250 merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1251 ..Default::default()
1252 };
1253
1254 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1255 .await
1256 .unwrap();
1257
1258 for batch in 0..12 {
1260 for i in 0..50 {
1261 let mut doc = Document::new();
1262 doc.add_text(title, format!("document_{} batch_{} alpha bravo", i, batch));
1263 doc.add_text(
1264 body,
1265 format!(
1266 "the quick brown fox jumps over lazy dog number {} round {}",
1267 i, batch
1268 ),
1269 );
1270 writer.add_document(doc).unwrap();
1271 }
1272 writer.commit().await.unwrap();
1273 }
1274
1275 let pre_merge = writer.segment_manager.get_segment_ids().await.len();
1276
1277 writer.wait_for_merging_thread().await;
1280 writer.maybe_merge().await;
1281 writer.wait_for_merging_thread().await;
1282
1283 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1285 let segment_count = index.segment_readers().await.unwrap().len();
1286 eprintln!(
1287 "Segments: {} before merge, {} after auto-merge",
1288 pre_merge, segment_count
1289 );
1290 assert!(
1291 segment_count < pre_merge,
1292 "Expected auto-merge to reduce segments from {}, got {}",
1293 pre_merge,
1294 segment_count
1295 );
1296 }
1297
1298 #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
1303 async fn test_commit_with_vectors_and_background_merge() {
1304 use crate::directories::MmapDirectory;
1305 use crate::dsl::DenseVectorConfig;
1306
1307 let tmp_dir = tempfile::tempdir().unwrap();
1308 let dir = MmapDirectory::new(tmp_dir.path());
1309
1310 let mut schema_builder = SchemaBuilder::default();
1311 let title = schema_builder.add_text_field("title", true, true);
1312 let vec_config = DenseVectorConfig::new(8).with_build_threshold(10);
1314 let embedding =
1315 schema_builder.add_dense_vector_field_with_config("embedding", true, true, vec_config);
1316 let schema = schema_builder.build();
1317
1318 let config = IndexConfig {
1320 max_indexing_memory_bytes: 4096,
1321 num_indexing_threads: 4,
1322 merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1323 ..Default::default()
1324 };
1325
1326 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1327 .await
1328 .unwrap();
1329
1330 for batch in 0..12 {
1333 for i in 0..5 {
1334 let mut doc = Document::new();
1335 doc.add_text(title, format!("doc_{}_batch_{}", i, batch));
1336 let vec: Vec<f32> = (0..8).map(|j| (i * 8 + j + batch) as f32 * 0.1).collect();
1338 doc.add_dense_vector(embedding, vec);
1339 writer.add_document(doc).unwrap();
1340 }
1341 writer.commit().await.unwrap();
1342 }
1343 writer.wait_for_merging_thread().await;
1344
1345 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1346 let num_docs = index.num_docs().await.unwrap();
1347 assert_eq!(num_docs, 60, "Expected 60 docs, got {}", num_docs);
1348 }
1349
1350 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1353 async fn test_force_merge_many_segments() {
1354 use crate::directories::MmapDirectory;
1355 let tmp_dir = tempfile::tempdir().unwrap();
1356 let dir = MmapDirectory::new(tmp_dir.path());
1357
1358 let mut schema_builder = SchemaBuilder::default();
1359 let title = schema_builder.add_text_field("title", true, true);
1360 let schema = schema_builder.build();
1361
1362 let config = IndexConfig {
1363 max_indexing_memory_bytes: 512,
1364 ..Default::default()
1365 };
1366
1367 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1368 .await
1369 .unwrap();
1370
1371 for batch in 0..50 {
1373 for i in 0..3 {
1374 let mut doc = Document::new();
1375 doc.add_text(title, format!("term_{} batch_{}", i, batch));
1376 writer.add_document(doc).unwrap();
1377 }
1378 writer.commit().await.unwrap();
1379 }
1380 writer.wait_for_merging_thread().await;
1382
1383 let seg_ids = writer.segment_manager.get_segment_ids().await;
1384 let pre = seg_ids.len();
1385 eprintln!("Segments before force_merge: {}", pre);
1386 assert!(pre >= 2, "Expected multiple segments, got {}", pre);
1387
1388 writer.force_merge().await.unwrap();
1390
1391 let index2 = Index::open(dir, config).await.unwrap();
1392 let post = index2.segment_readers().await.unwrap().len();
1393 eprintln!("Segments after force_merge: {}", post);
1394 assert_eq!(post, 1);
1395 assert_eq!(index2.num_docs().await.unwrap(), 150);
1396 }
1397
1398 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1402 async fn test_background_merge_generation() {
1403 use crate::directories::MmapDirectory;
1404 let tmp_dir = tempfile::tempdir().unwrap();
1405 let dir = MmapDirectory::new(tmp_dir.path());
1406
1407 let mut schema_builder = SchemaBuilder::default();
1408 let title = schema_builder.add_text_field("title", true, true);
1409 let schema = schema_builder.build();
1410
1411 let config = IndexConfig {
1412 max_indexing_memory_bytes: 4096,
1413 num_indexing_threads: 2,
1414 merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1415 ..Default::default()
1416 };
1417
1418 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1419 .await
1420 .unwrap();
1421
1422 for batch in 0..15 {
1424 for i in 0..5 {
1425 let mut doc = Document::new();
1426 doc.add_text(title, format!("doc_{}_batch_{}", i, batch));
1427 writer.add_document(doc).unwrap();
1428 }
1429 writer.commit().await.unwrap();
1430 }
1431 writer.wait_for_merging_thread().await;
1432
1433 let metas = writer
1435 .segment_manager
1436 .read_metadata(|m| m.segment_metas.clone())
1437 .await;
1438
1439 let max_gen = metas.values().map(|m| m.generation).max().unwrap_or(0);
1440 eprintln!(
1441 "Segments after merge: {}, max generation: {}",
1442 metas.len(),
1443 max_gen
1444 );
1445
1446 assert!(
1448 max_gen >= 1,
1449 "Expected at least one merged segment (gen >= 1), got max_gen={}",
1450 max_gen
1451 );
1452
1453 for (id, info) in &metas {
1455 if info.generation > 0 {
1456 assert!(
1457 !info.ancestors.is_empty(),
1458 "Segment {} has gen={} but no ancestors",
1459 id,
1460 info.generation
1461 );
1462 } else {
1463 assert!(
1464 info.ancestors.is_empty(),
1465 "Fresh segment {} has gen=0 but has ancestors",
1466 id
1467 );
1468 }
1469 }
1470 }
1471
1472 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1476 async fn test_merge_preserves_all_documents() {
1477 use crate::directories::MmapDirectory;
1478 let tmp_dir = tempfile::tempdir().unwrap();
1479 let dir = MmapDirectory::new(tmp_dir.path());
1480
1481 let mut schema_builder = SchemaBuilder::default();
1482 let title = schema_builder.add_text_field("title", true, true);
1483 let schema = schema_builder.build();
1484
1485 let config = IndexConfig {
1486 max_indexing_memory_bytes: 4096,
1487 ..Default::default()
1488 };
1489
1490 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1491 .await
1492 .unwrap();
1493
1494 let total_docs = 1200;
1495 let docs_per_batch = 60;
1496 let batches = total_docs / docs_per_batch;
1497
1498 for batch in 0..batches {
1500 for i in 0..docs_per_batch {
1501 let doc_num = batch * docs_per_batch + i;
1502 let mut doc = Document::new();
1503 doc.add_text(
1504 title,
1505 format!("uid_{} common_term batch_{}", doc_num, batch),
1506 );
1507 writer.add_document(doc).unwrap();
1508 }
1509 writer.commit().await.unwrap();
1510 }
1511
1512 let pre_segments = writer.segment_manager.get_segment_ids().await.len();
1513 assert!(
1514 pre_segments >= 2,
1515 "Need multiple segments, got {}",
1516 pre_segments
1517 );
1518
1519 writer.force_merge().await.unwrap();
1521
1522 let index = Index::open(dir, config).await.unwrap();
1523 assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1524 assert_eq!(
1525 index.num_docs().await.unwrap(),
1526 total_docs as u32,
1527 "Doc count mismatch after force_merge"
1528 );
1529
1530 let results = index.query("common_term", total_docs + 100).await.unwrap();
1532 assert_eq!(
1533 results.hits.len(),
1534 total_docs,
1535 "common_term should match all docs"
1536 );
1537
1538 for check in [0, 1, total_docs / 2, total_docs - 1] {
1540 let q = format!("uid_{}", check);
1541 let results = index.query(&q, 10).await.unwrap();
1542 assert_eq!(results.hits.len(), 1, "'{}' should match exactly 1 doc", q);
1543 }
1544 }
1545
1546 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1549 async fn test_multi_round_merge_doc_integrity() {
1550 use crate::directories::MmapDirectory;
1551 let tmp_dir = tempfile::tempdir().unwrap();
1552 let dir = MmapDirectory::new(tmp_dir.path());
1553
1554 let mut schema_builder = SchemaBuilder::default();
1555 let title = schema_builder.add_text_field("title", true, true);
1556 let schema = schema_builder.build();
1557
1558 let config = IndexConfig {
1559 max_indexing_memory_bytes: 4096,
1560 num_indexing_threads: 2,
1561 merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1562 ..Default::default()
1563 };
1564
1565 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1566 .await
1567 .unwrap();
1568
1569 let mut expected_total = 0u64;
1570
1571 for round in 0..4 {
1573 let docs_this_round = 50 + round * 25; for batch in 0..5 {
1575 for i in 0..docs_this_round / 5 {
1576 let mut doc = Document::new();
1577 doc.add_text(
1578 title,
1579 format!("round_{}_batch_{}_doc_{} searchable", round, batch, i),
1580 );
1581 writer.add_document(doc).unwrap();
1582 }
1583 writer.commit().await.unwrap();
1584 }
1585 writer.wait_for_merging_thread().await;
1586
1587 expected_total += docs_this_round as u64;
1588
1589 let actual = writer
1590 .segment_manager
1591 .read_metadata(|m| {
1592 m.segment_metas
1593 .values()
1594 .map(|s| s.num_docs as u64)
1595 .sum::<u64>()
1596 })
1597 .await;
1598
1599 assert_eq!(
1600 actual, expected_total,
1601 "Round {}: expected {} docs, metadata reports {}",
1602 round, expected_total, actual
1603 );
1604 }
1605
1606 let index = Index::open(dir, config).await.unwrap();
1608 assert_eq!(index.num_docs().await.unwrap(), expected_total as u32);
1609
1610 let results = index
1611 .query("searchable", expected_total as usize + 100)
1612 .await
1613 .unwrap();
1614 assert_eq!(
1615 results.hits.len(),
1616 expected_total as usize,
1617 "All docs should match 'searchable'"
1618 );
1619
1620 let metas = index
1622 .segment_manager()
1623 .read_metadata(|m| m.segment_metas.clone())
1624 .await;
1625 let max_gen = metas.values().map(|m| m.generation).max().unwrap_or(0);
1626 eprintln!(
1627 "Final: {} segments, {} docs, max generation={}",
1628 metas.len(),
1629 expected_total,
1630 max_gen
1631 );
1632 assert!(
1633 max_gen >= 1,
1634 "Multiple merge rounds should produce gen >= 1"
1635 );
1636 }
1637
1638 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1644 async fn test_segment_count_bounded_during_sustained_indexing() {
1645 use crate::directories::MmapDirectory;
1646 let tmp_dir = tempfile::tempdir().unwrap();
1647 let dir = MmapDirectory::new(tmp_dir.path());
1648
1649 let mut schema_builder = SchemaBuilder::default();
1650 let title = schema_builder.add_text_field("title", true, false);
1651 let schema = schema_builder.build();
1652
1653 let policy = crate::merge::TieredMergePolicy {
1654 segments_per_tier: 3,
1655 max_merge_at_once: 5,
1656 tier_factor: 10.0,
1657 tier_floor: 50,
1658 max_merged_docs: 1_000_000,
1659 };
1660
1661 let config = IndexConfig {
1662 max_indexing_memory_bytes: 4096, num_indexing_threads: 1,
1664 merge_policy: Box::new(policy),
1665 max_concurrent_merges: 4,
1666 ..Default::default()
1667 };
1668
1669 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1670 .await
1671 .unwrap();
1672
1673 let num_commits = 40;
1674 let docs_per_commit = 30;
1675 let total_docs = num_commits * docs_per_commit;
1676 let mut max_segments_seen = 0usize;
1677
1678 for commit_idx in 0..num_commits {
1679 for i in 0..docs_per_commit {
1680 let mut doc = Document::new();
1681 doc.add_text(
1682 title,
1683 format!("doc_{} text", commit_idx * docs_per_commit + i),
1684 );
1685 writer.add_document(doc).unwrap();
1686 }
1687 writer.commit().await.unwrap();
1688
1689 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1691
1692 let seg_count = writer.segment_manager.get_segment_ids().await.len();
1693 max_segments_seen = max_segments_seen.max(seg_count);
1694 }
1695
1696 writer.wait_for_all_merges().await;
1698
1699 let final_segments = writer.segment_manager.get_segment_ids().await.len();
1700 let final_docs: u64 = writer
1701 .segment_manager
1702 .read_metadata(|m| {
1703 m.segment_metas
1704 .values()
1705 .map(|s| s.num_docs as u64)
1706 .sum::<u64>()
1707 })
1708 .await;
1709
1710 eprintln!(
1711 "Sustained indexing: {} commits, {} total docs, final segments={}, max segments seen={}",
1712 num_commits, total_docs, final_segments, max_segments_seen
1713 );
1714
1715 let max_allowed = num_commits / 2; assert!(
1722 max_segments_seen <= max_allowed,
1723 "Segment count grew too fast: max seen {} > allowed {} (out of {} commits). \
1724 Merging is not keeping up.",
1725 max_segments_seen,
1726 max_allowed,
1727 num_commits
1728 );
1729
1730 assert!(
1732 final_segments <= 10,
1733 "After all merges, expected ≤10 segments, got {}",
1734 final_segments
1735 );
1736
1737 assert_eq!(
1739 final_docs, total_docs as u64,
1740 "Expected {} docs, metadata reports {}",
1741 total_docs, final_docs
1742 );
1743 }
1744}