1#[cfg(feature = "native")]
11use crate::dsl::Schema;
12#[cfg(feature = "native")]
13use crate::error::Result;
14#[cfg(feature = "native")]
15use std::sync::Arc;
16
17mod searcher;
18pub use searcher::Searcher;
19
20#[cfg(feature = "native")]
21mod reader;
22#[cfg(feature = "native")]
23mod vector_builder;
24#[cfg(feature = "native")]
25mod writer;
26#[cfg(feature = "native")]
27pub use reader::IndexReader;
28#[cfg(feature = "native")]
29pub use writer::{IndexWriter, PreparedCommit};
30
31mod metadata;
32pub use metadata::{FieldVectorMeta, INDEX_META_FILENAME, IndexMetadata, VectorIndexState};
33
34#[cfg(feature = "native")]
35mod helpers;
36#[cfg(feature = "native")]
37pub use helpers::{
38 IndexingStats, SchemaConfig, SchemaFieldConfig, create_index_at_path, create_index_from_sdl,
39 index_documents_from_reader, index_json_document, parse_schema,
40};
41
42pub const SLICE_CACHE_FILENAME: &str = "index.slicecache";
44
45#[derive(Debug, Clone)]
47pub struct IndexConfig {
48 pub num_threads: usize,
50 pub num_indexing_threads: usize,
52 pub num_compression_threads: usize,
54 pub term_cache_blocks: usize,
56 pub store_cache_blocks: usize,
58 pub max_indexing_memory_bytes: usize,
60 pub merge_policy: Box<dyn crate::merge::MergePolicy>,
62 pub optimization: crate::structures::IndexOptimization,
64 pub reload_interval_ms: u64,
66}
67
68impl Default for IndexConfig {
69 fn default() -> Self {
70 #[cfg(feature = "native")]
71 let indexing_threads = crate::default_indexing_threads();
72 #[cfg(not(feature = "native"))]
73 let indexing_threads = 1;
74
75 #[cfg(feature = "native")]
76 let compression_threads = crate::default_compression_threads();
77 #[cfg(not(feature = "native"))]
78 let compression_threads = 1;
79
80 Self {
81 num_threads: indexing_threads,
82 num_indexing_threads: 1,
83 num_compression_threads: compression_threads,
84 term_cache_blocks: 256,
85 store_cache_blocks: 32,
86 max_indexing_memory_bytes: 256 * 1024 * 1024, merge_policy: Box::new(crate::merge::TieredMergePolicy::default()),
88 optimization: crate::structures::IndexOptimization::default(),
89 reload_interval_ms: 1000, }
91 }
92}
93
94#[cfg(feature = "native")]
103pub struct Index<D: crate::directories::DirectoryWriter + 'static> {
104 directory: Arc<D>,
105 schema: Arc<Schema>,
106 config: IndexConfig,
107 segment_manager: Arc<crate::merge::SegmentManager<D>>,
109 cached_reader: tokio::sync::OnceCell<IndexReader<D>>,
111}
112
113#[cfg(feature = "native")]
114impl<D: crate::directories::DirectoryWriter + 'static> Index<D> {
115 pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
117 let directory = Arc::new(directory);
118 let schema = Arc::new(schema);
119 let metadata = IndexMetadata::new((*schema).clone());
120
121 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
122 Arc::clone(&directory),
123 Arc::clone(&schema),
124 metadata,
125 config.merge_policy.clone_box(),
126 config.term_cache_blocks,
127 ));
128
129 segment_manager.update_metadata(|_| {}).await?;
131
132 Ok(Self {
133 directory,
134 schema,
135 config,
136 segment_manager,
137 cached_reader: tokio::sync::OnceCell::new(),
138 })
139 }
140
141 pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
143 let directory = Arc::new(directory);
144
145 let metadata = IndexMetadata::load(directory.as_ref()).await?;
147 let schema = Arc::new(metadata.schema.clone());
148
149 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
150 Arc::clone(&directory),
151 Arc::clone(&schema),
152 metadata,
153 config.merge_policy.clone_box(),
154 config.term_cache_blocks,
155 ));
156
157 segment_manager.load_and_publish_trained().await;
159
160 Ok(Self {
161 directory,
162 schema,
163 config,
164 segment_manager,
165 cached_reader: tokio::sync::OnceCell::new(),
166 })
167 }
168
169 pub fn schema(&self) -> &Schema {
171 &self.schema
172 }
173
174 pub fn directory(&self) -> &D {
176 &self.directory
177 }
178
179 pub fn segment_manager(&self) -> &Arc<crate::merge::SegmentManager<D>> {
181 &self.segment_manager
182 }
183
184 pub async fn reader(&self) -> Result<&IndexReader<D>> {
189 self.cached_reader
190 .get_or_try_init(|| async {
191 IndexReader::from_segment_manager(
192 Arc::clone(&self.schema),
193 Arc::clone(&self.segment_manager),
194 self.config.term_cache_blocks,
195 self.config.reload_interval_ms,
196 )
197 .await
198 })
199 .await
200 }
201
202 pub fn config(&self) -> &IndexConfig {
204 &self.config
205 }
206
207 pub async fn segment_readers(&self) -> Result<Vec<Arc<crate::segment::SegmentReader>>> {
209 let reader = self.reader().await?;
210 let searcher = reader.searcher().await?;
211 Ok(searcher.segment_readers().to_vec())
212 }
213
214 pub async fn num_docs(&self) -> Result<u32> {
216 let reader = self.reader().await?;
217 let searcher = reader.searcher().await?;
218 Ok(searcher.num_docs())
219 }
220
221 pub fn default_fields(&self) -> Vec<crate::Field> {
223 if !self.schema.default_fields().is_empty() {
224 self.schema.default_fields().to_vec()
225 } else {
226 self.schema
227 .fields()
228 .filter(|(_, entry)| {
229 entry.indexed && entry.field_type == crate::dsl::FieldType::Text
230 })
231 .map(|(field, _)| field)
232 .collect()
233 }
234 }
235
236 pub fn tokenizers(&self) -> Arc<crate::tokenizer::TokenizerRegistry> {
238 Arc::new(crate::tokenizer::TokenizerRegistry::default())
239 }
240
241 pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
243 let default_fields = self.default_fields();
244 let tokenizers = self.tokenizers();
245
246 let query_routers = self.schema.query_routers();
247 if !query_routers.is_empty()
248 && let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers)
249 {
250 return crate::dsl::QueryLanguageParser::with_router(
251 Arc::clone(&self.schema),
252 default_fields,
253 tokenizers,
254 router,
255 );
256 }
257
258 crate::dsl::QueryLanguageParser::new(Arc::clone(&self.schema), default_fields, tokenizers)
259 }
260
261 pub async fn query(
263 &self,
264 query_str: &str,
265 limit: usize,
266 ) -> Result<crate::query::SearchResponse> {
267 self.query_offset(query_str, limit, 0).await
268 }
269
270 pub async fn query_offset(
272 &self,
273 query_str: &str,
274 limit: usize,
275 offset: usize,
276 ) -> Result<crate::query::SearchResponse> {
277 let parser = self.query_parser();
278 let query = parser
279 .parse(query_str)
280 .map_err(crate::error::Error::Query)?;
281 self.search_offset(query.as_ref(), limit, offset).await
282 }
283
284 pub async fn search(
286 &self,
287 query: &dyn crate::query::Query,
288 limit: usize,
289 ) -> Result<crate::query::SearchResponse> {
290 self.search_offset(query, limit, 0).await
291 }
292
293 pub async fn search_offset(
295 &self,
296 query: &dyn crate::query::Query,
297 limit: usize,
298 offset: usize,
299 ) -> Result<crate::query::SearchResponse> {
300 let reader = self.reader().await?;
301 let searcher = reader.searcher().await?;
302 let segments = searcher.segment_readers();
303
304 let fetch_limit = offset + limit;
305
306 let futures: Vec<_> = segments
307 .iter()
308 .map(|segment| {
309 let sid = segment.meta().id;
310 async move {
311 let results =
312 crate::query::search_segment(segment.as_ref(), query, fetch_limit).await?;
313 Ok::<_, crate::error::Error>(
314 results
315 .into_iter()
316 .map(move |r| (sid, r))
317 .collect::<Vec<_>>(),
318 )
319 }
320 })
321 .collect();
322
323 let batches = futures::future::try_join_all(futures).await?;
324 let mut all_results: Vec<(u128, crate::query::SearchResult)> =
325 Vec::with_capacity(batches.iter().map(|b| b.len()).sum());
326 for batch in batches {
327 all_results.extend(batch);
328 }
329
330 all_results.sort_by(|a, b| {
331 b.1.score
332 .partial_cmp(&a.1.score)
333 .unwrap_or(std::cmp::Ordering::Equal)
334 });
335
336 let total_hits = all_results.len() as u32;
337
338 let hits: Vec<crate::query::SearchHit> = all_results
339 .into_iter()
340 .skip(offset)
341 .take(limit)
342 .map(|(segment_id, result)| crate::query::SearchHit {
343 address: crate::query::DocAddress::new(segment_id, result.doc_id),
344 score: result.score,
345 matched_fields: result.extract_ordinals(),
346 })
347 .collect();
348
349 Ok(crate::query::SearchResponse { hits, total_hits })
350 }
351
352 pub async fn get_document(
354 &self,
355 address: &crate::query::DocAddress,
356 ) -> Result<Option<crate::dsl::Document>> {
357 let reader = self.reader().await?;
358 let searcher = reader.searcher().await?;
359 searcher.get_document(address).await
360 }
361
362 pub async fn get_postings(
364 &self,
365 field: crate::Field,
366 term: &[u8],
367 ) -> Result<
368 Vec<(
369 Arc<crate::segment::SegmentReader>,
370 crate::structures::BlockPostingList,
371 )>,
372 > {
373 let segments = self.segment_readers().await?;
374 let mut results = Vec::new();
375
376 for segment in segments {
377 if let Some(postings) = segment.get_postings(field, term).await? {
378 results.push((segment, postings));
379 }
380 }
381
382 Ok(results)
383 }
384}
385
386#[cfg(feature = "native")]
388impl<D: crate::directories::DirectoryWriter + 'static> Index<D> {
389 pub fn writer(&self) -> writer::IndexWriter<D> {
391 writer::IndexWriter::from_index(self)
392 }
393}
394
395#[cfg(test)]
396mod tests {
397 use super::*;
398 use crate::directories::RamDirectory;
399 use crate::dsl::{Document, SchemaBuilder};
400
401 #[tokio::test]
402 async fn test_index_create_and_search() {
403 let mut schema_builder = SchemaBuilder::default();
404 let title = schema_builder.add_text_field("title", true, true);
405 let body = schema_builder.add_text_field("body", true, true);
406 let schema = schema_builder.build();
407
408 let dir = RamDirectory::new();
409 let config = IndexConfig::default();
410
411 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
413 .await
414 .unwrap();
415
416 let mut doc1 = Document::new();
417 doc1.add_text(title, "Hello World");
418 doc1.add_text(body, "This is the first document");
419 writer.add_document(doc1).unwrap();
420
421 let mut doc2 = Document::new();
422 doc2.add_text(title, "Goodbye World");
423 doc2.add_text(body, "This is the second document");
424 writer.add_document(doc2).unwrap();
425
426 writer.commit().await.unwrap();
427
428 let index = Index::open(dir, config).await.unwrap();
430 assert_eq!(index.num_docs().await.unwrap(), 2);
431
432 let postings = index.get_postings(title, b"world").await.unwrap();
434 assert_eq!(postings.len(), 1); assert_eq!(postings[0].1.doc_count(), 2); let reader = index.reader().await.unwrap();
439 let searcher = reader.searcher().await.unwrap();
440 let doc = searcher.doc(0).await.unwrap().unwrap();
441 assert_eq!(doc.get_first(title).unwrap().as_text(), Some("Hello World"));
442 }
443
444 #[tokio::test]
445 async fn test_multiple_segments() {
446 let mut schema_builder = SchemaBuilder::default();
447 let title = schema_builder.add_text_field("title", true, true);
448 let schema = schema_builder.build();
449
450 let dir = RamDirectory::new();
451 let config = IndexConfig {
452 max_indexing_memory_bytes: 1024, ..Default::default()
454 };
455
456 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
457 .await
458 .unwrap();
459
460 for batch in 0..3 {
462 for i in 0..5 {
463 let mut doc = Document::new();
464 doc.add_text(title, format!("Document {} batch {}", i, batch));
465 writer.add_document(doc).unwrap();
466 }
467 writer.commit().await.unwrap();
468 }
469
470 let index = Index::open(dir, config).await.unwrap();
472 assert_eq!(index.num_docs().await.unwrap(), 15);
473 assert!(
475 index.segment_readers().await.unwrap().len() >= 2,
476 "Expected multiple segments"
477 );
478 }
479
480 #[tokio::test]
481 async fn test_segment_merge() {
482 let mut schema_builder = SchemaBuilder::default();
483 let title = schema_builder.add_text_field("title", true, true);
484 let schema = schema_builder.build();
485
486 let dir = RamDirectory::new();
487 let config = IndexConfig {
488 max_indexing_memory_bytes: 512, ..Default::default()
490 };
491
492 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
493 .await
494 .unwrap();
495
496 for batch in 0..3 {
498 for i in 0..3 {
499 let mut doc = Document::new();
500 doc.add_text(title, format!("Document {} batch {}", i, batch));
501 writer.add_document(doc).unwrap();
502 }
503 writer.commit().await.unwrap();
504 }
505
506 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
508 assert!(
509 index.segment_readers().await.unwrap().len() >= 2,
510 "Expected multiple segments"
511 );
512
513 let mut writer = IndexWriter::open(dir.clone(), config.clone())
515 .await
516 .unwrap();
517 writer.force_merge().await.unwrap();
518
519 let index = Index::open(dir, config).await.unwrap();
521 assert_eq!(index.segment_readers().await.unwrap().len(), 1);
522 assert_eq!(index.num_docs().await.unwrap(), 9);
523
524 let reader = index.reader().await.unwrap();
526 let searcher = reader.searcher().await.unwrap();
527 let mut found_docs = 0;
528 for i in 0..9 {
529 if searcher.doc(i).await.unwrap().is_some() {
530 found_docs += 1;
531 }
532 }
533 assert_eq!(found_docs, 9);
534 }
535
536 #[tokio::test]
537 async fn test_match_query() {
538 let mut schema_builder = SchemaBuilder::default();
539 let title = schema_builder.add_text_field("title", true, true);
540 let body = schema_builder.add_text_field("body", true, true);
541 let schema = schema_builder.build();
542
543 let dir = RamDirectory::new();
544 let config = IndexConfig::default();
545
546 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
547 .await
548 .unwrap();
549
550 let mut doc1 = Document::new();
551 doc1.add_text(title, "rust programming");
552 doc1.add_text(body, "Learn rust language");
553 writer.add_document(doc1).unwrap();
554
555 let mut doc2 = Document::new();
556 doc2.add_text(title, "python programming");
557 doc2.add_text(body, "Learn python language");
558 writer.add_document(doc2).unwrap();
559
560 writer.commit().await.unwrap();
561
562 let index = Index::open(dir, config).await.unwrap();
563
564 let results = index.query("rust", 10).await.unwrap();
566 assert_eq!(results.hits.len(), 1);
567
568 let results = index.query("rust programming", 10).await.unwrap();
570 assert!(!results.hits.is_empty());
571
572 let hit = &results.hits[0];
574 assert!(!hit.address.segment_id.is_empty(), "Should have segment_id");
575
576 let doc = index.get_document(&hit.address).await.unwrap().unwrap();
578 assert!(
579 !doc.field_values().is_empty(),
580 "Doc should have field values"
581 );
582
583 let reader = index.reader().await.unwrap();
585 let searcher = reader.searcher().await.unwrap();
586 let doc = searcher.doc(0).await.unwrap().unwrap();
587 assert!(
588 !doc.field_values().is_empty(),
589 "Doc should have field values"
590 );
591 }
592
593 #[tokio::test]
594 async fn test_slice_cache_warmup_and_load() {
595 use crate::directories::SliceCachingDirectory;
596
597 let mut schema_builder = SchemaBuilder::default();
598 let title = schema_builder.add_text_field("title", true, true);
599 let body = schema_builder.add_text_field("body", true, true);
600 let schema = schema_builder.build();
601
602 let dir = RamDirectory::new();
603 let config = IndexConfig::default();
604
605 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
607 .await
608 .unwrap();
609
610 for i in 0..10 {
611 let mut doc = Document::new();
612 doc.add_text(title, format!("Document {} about rust", i));
613 doc.add_text(body, format!("This is body text number {}", i));
614 writer.add_document(doc).unwrap();
615 }
616 writer.commit().await.unwrap();
617
618 let caching_dir = SliceCachingDirectory::new(dir.clone(), 1024 * 1024);
620 let index = Index::open(caching_dir, config.clone()).await.unwrap();
621
622 let results = index.query("rust", 10).await.unwrap();
624 assert!(!results.hits.is_empty());
625
626 let stats = index.directory.stats();
628 assert!(stats.total_bytes > 0, "Cache should have data after search");
629 }
630
631 #[tokio::test]
632 async fn test_multivalue_field_indexing_and_search() {
633 let mut schema_builder = SchemaBuilder::default();
634 let uris = schema_builder.add_text_field("uris", true, true);
635 let title = schema_builder.add_text_field("title", true, true);
636 let schema = schema_builder.build();
637
638 let dir = RamDirectory::new();
639 let config = IndexConfig::default();
640
641 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
643 .await
644 .unwrap();
645
646 let mut doc = Document::new();
647 doc.add_text(uris, "one");
648 doc.add_text(uris, "two");
649 doc.add_text(title, "Test Document");
650 writer.add_document(doc).unwrap();
651
652 let mut doc2 = Document::new();
654 doc2.add_text(uris, "three");
655 doc2.add_text(title, "Another Document");
656 writer.add_document(doc2).unwrap();
657
658 writer.commit().await.unwrap();
659
660 let index = Index::open(dir, config).await.unwrap();
662 assert_eq!(index.num_docs().await.unwrap(), 2);
663
664 let reader = index.reader().await.unwrap();
666 let searcher = reader.searcher().await.unwrap();
667 let doc = searcher.doc(0).await.unwrap().unwrap();
668 let all_uris: Vec<_> = doc.get_all(uris).collect();
669 assert_eq!(all_uris.len(), 2, "Should have 2 uris values");
670 assert_eq!(all_uris[0].as_text(), Some("one"));
671 assert_eq!(all_uris[1].as_text(), Some("two"));
672
673 let json = doc.to_json(index.schema());
675 let uris_json = json.get("uris").unwrap();
676 assert!(uris_json.is_array(), "Multi-value field should be an array");
677 let uris_arr = uris_json.as_array().unwrap();
678 assert_eq!(uris_arr.len(), 2);
679 assert_eq!(uris_arr[0].as_str(), Some("one"));
680 assert_eq!(uris_arr[1].as_str(), Some("two"));
681
682 let results = index.query("uris:one", 10).await.unwrap();
684 assert_eq!(results.hits.len(), 1, "Should find doc with 'one'");
685 assert_eq!(results.hits[0].address.doc_id, 0);
686
687 let results = index.query("uris:two", 10).await.unwrap();
688 assert_eq!(results.hits.len(), 1, "Should find doc with 'two'");
689 assert_eq!(results.hits[0].address.doc_id, 0);
690
691 let results = index.query("uris:three", 10).await.unwrap();
692 assert_eq!(results.hits.len(), 1, "Should find doc with 'three'");
693 assert_eq!(results.hits[0].address.doc_id, 1);
694
695 let results = index.query("uris:nonexistent", 10).await.unwrap();
697 assert_eq!(results.hits.len(), 0, "Should not find non-existent value");
698 }
699
700 #[tokio::test]
707 async fn test_wand_optimization_for_or_queries() {
708 use crate::query::{BooleanQuery, TermQuery};
709
710 let mut schema_builder = SchemaBuilder::default();
711 let content = schema_builder.add_text_field("content", true, true);
712 let schema = schema_builder.build();
713
714 let dir = RamDirectory::new();
715 let config = IndexConfig::default();
716
717 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
719 .await
720 .unwrap();
721
722 let mut doc = Document::new();
724 doc.add_text(content, "rust programming language is fast");
725 writer.add_document(doc).unwrap();
726
727 let mut doc = Document::new();
729 doc.add_text(content, "rust is a systems language");
730 writer.add_document(doc).unwrap();
731
732 let mut doc = Document::new();
734 doc.add_text(content, "programming is fun");
735 writer.add_document(doc).unwrap();
736
737 let mut doc = Document::new();
739 doc.add_text(content, "python is easy to learn");
740 writer.add_document(doc).unwrap();
741
742 let mut doc = Document::new();
744 doc.add_text(content, "rust rust programming programming systems");
745 writer.add_document(doc).unwrap();
746
747 writer.commit().await.unwrap();
748
749 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
751
752 let or_query = BooleanQuery::new()
754 .should(TermQuery::text(content, "rust"))
755 .should(TermQuery::text(content, "programming"));
756
757 let results = index.search(&or_query, 10).await.unwrap();
758
759 assert_eq!(results.hits.len(), 4, "Should find exactly 4 documents");
761
762 let doc_ids: Vec<u32> = results.hits.iter().map(|h| h.address.doc_id).collect();
763 assert!(doc_ids.contains(&0), "Should find doc 0");
764 assert!(doc_ids.contains(&1), "Should find doc 1");
765 assert!(doc_ids.contains(&2), "Should find doc 2");
766 assert!(doc_ids.contains(&4), "Should find doc 4");
767 assert!(
768 !doc_ids.contains(&3),
769 "Should NOT find doc 3 (only has 'python')"
770 );
771
772 let single_query = BooleanQuery::new().should(TermQuery::text(content, "rust"));
774
775 let results = index.search(&single_query, 10).await.unwrap();
776 assert_eq!(results.hits.len(), 3, "Should find 3 documents with 'rust'");
777
778 let must_query = BooleanQuery::new()
780 .must(TermQuery::text(content, "rust"))
781 .should(TermQuery::text(content, "programming"));
782
783 let results = index.search(&must_query, 10).await.unwrap();
784 assert_eq!(results.hits.len(), 3, "Should find 3 documents with 'rust'");
786
787 let must_not_query = BooleanQuery::new()
789 .should(TermQuery::text(content, "rust"))
790 .should(TermQuery::text(content, "programming"))
791 .must_not(TermQuery::text(content, "systems"));
792
793 let results = index.search(&must_not_query, 10).await.unwrap();
794 let doc_ids: Vec<u32> = results.hits.iter().map(|h| h.address.doc_id).collect();
796 assert!(
797 !doc_ids.contains(&1),
798 "Should NOT find doc 1 (has 'systems')"
799 );
800 assert!(
801 !doc_ids.contains(&4),
802 "Should NOT find doc 4 (has 'systems')"
803 );
804
805 let or_query = BooleanQuery::new()
807 .should(TermQuery::text(content, "rust"))
808 .should(TermQuery::text(content, "programming"));
809
810 let results = index.search(&or_query, 2).await.unwrap();
811 assert_eq!(results.hits.len(), 2, "Should return only top 2 results");
812
813 }
816
817 #[tokio::test]
819 async fn test_wand_results_match_standard_boolean() {
820 use crate::query::{BooleanQuery, TermQuery, WandOrQuery};
821
822 let mut schema_builder = SchemaBuilder::default();
823 let content = schema_builder.add_text_field("content", true, true);
824 let schema = schema_builder.build();
825
826 let dir = RamDirectory::new();
827 let config = IndexConfig::default();
828
829 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
830 .await
831 .unwrap();
832
833 for i in 0..10 {
835 let mut doc = Document::new();
836 let text = match i % 4 {
837 0 => "apple banana cherry",
838 1 => "apple orange",
839 2 => "banana grape",
840 _ => "cherry date",
841 };
842 doc.add_text(content, text);
843 writer.add_document(doc).unwrap();
844 }
845
846 writer.commit().await.unwrap();
847 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
848
849 let wand_query = WandOrQuery::new(content).term("apple").term("banana");
851
852 let bool_query = BooleanQuery::new()
853 .should(TermQuery::text(content, "apple"))
854 .should(TermQuery::text(content, "banana"));
855
856 let wand_results = index.search(&wand_query, 10).await.unwrap();
857 let bool_results = index.search(&bool_query, 10).await.unwrap();
858
859 assert_eq!(
861 wand_results.hits.len(),
862 bool_results.hits.len(),
863 "WAND and Boolean should find same number of docs"
864 );
865
866 let wand_docs: std::collections::HashSet<u32> =
867 wand_results.hits.iter().map(|h| h.address.doc_id).collect();
868 let bool_docs: std::collections::HashSet<u32> =
869 bool_results.hits.iter().map(|h| h.address.doc_id).collect();
870
871 assert_eq!(
872 wand_docs, bool_docs,
873 "WAND and Boolean should find same documents"
874 );
875 }
876
877 #[tokio::test]
878 async fn test_vector_index_threshold_switch() {
879 use crate::dsl::{DenseVectorConfig, DenseVectorQuantization, VectorIndexType};
880
881 let mut schema_builder = SchemaBuilder::default();
883 let title = schema_builder.add_text_field("title", true, true);
884 let embedding = schema_builder.add_dense_vector_field_with_config(
885 "embedding",
886 true, true, DenseVectorConfig {
889 dim: 8,
890 index_type: VectorIndexType::IvfRaBitQ,
891 quantization: DenseVectorQuantization::F32,
892 num_clusters: Some(4), nprobe: 2,
894 build_threshold: Some(50), },
896 );
897 let schema = schema_builder.build();
898
899 let dir = RamDirectory::new();
900 let config = IndexConfig::default();
901
902 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
904 .await
905 .unwrap();
906
907 for i in 0..30 {
909 let mut doc = Document::new();
910 doc.add_text(title, format!("Document {}", i));
911 let vec: Vec<f32> = (0..8).map(|_| (i as f32) / 30.0).collect();
913 doc.add_dense_vector(embedding, vec);
914 writer.add_document(doc).unwrap();
915 }
916 writer.commit().await.unwrap();
917
918 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
920 assert!(
921 index.segment_manager.trained().is_none(),
922 "Should not have trained centroids below threshold"
923 );
924
925 let query_vec: Vec<f32> = vec![0.5; 8];
927 let segments = index.segment_readers().await.unwrap();
928 assert!(!segments.is_empty());
929
930 let results = segments[0]
931 .search_dense_vector(
932 embedding,
933 &query_vec,
934 5,
935 0,
936 1,
937 crate::query::MultiValueCombiner::Max,
938 )
939 .await
940 .unwrap();
941 assert!(!results.is_empty(), "Flat search should return results");
942
943 let mut writer = IndexWriter::open(dir.clone(), config.clone())
945 .await
946 .unwrap();
947
948 for i in 30..60 {
950 let mut doc = Document::new();
951 doc.add_text(title, format!("Document {}", i));
952 let vec: Vec<f32> = (0..8).map(|_| (i as f32) / 60.0).collect();
953 doc.add_dense_vector(embedding, vec);
954 writer.add_document(doc).unwrap();
955 }
956 writer.commit().await.unwrap();
957
958 writer.build_vector_index().await.unwrap();
960
961 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
963 assert!(
964 index.segment_manager.trained().is_some(),
965 "Should have loaded trained centroids for embedding field"
966 );
967
968 let segments = index.segment_readers().await.unwrap();
970 let results = segments[0]
971 .search_dense_vector(
972 embedding,
973 &query_vec,
974 5,
975 0,
976 1,
977 crate::query::MultiValueCombiner::Max,
978 )
979 .await
980 .unwrap();
981 assert!(
982 !results.is_empty(),
983 "Search should return results after build"
984 );
985
986 let writer = IndexWriter::open(dir.clone(), config.clone())
988 .await
989 .unwrap();
990 writer.build_vector_index().await.unwrap(); assert!(writer.segment_manager.trained().is_some());
994 }
995
996 #[tokio::test]
999 async fn test_multi_round_merge_with_search() {
1000 let mut schema_builder = SchemaBuilder::default();
1001 let title = schema_builder.add_text_field("title", true, true);
1002 let body = schema_builder.add_text_field("body", true, true);
1003 let schema = schema_builder.build();
1004
1005 let dir = RamDirectory::new();
1006 let config = IndexConfig {
1007 max_indexing_memory_bytes: 512,
1008 ..Default::default()
1009 };
1010
1011 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1013 .await
1014 .unwrap();
1015
1016 for batch in 0..5 {
1017 for i in 0..10 {
1018 let mut doc = Document::new();
1019 doc.add_text(
1020 title,
1021 format!("alpha bravo charlie batch{} doc{}", batch, i),
1022 );
1023 doc.add_text(
1024 body,
1025 format!("the quick brown fox jumps over the lazy dog number {}", i),
1026 );
1027 writer.add_document(doc).unwrap();
1028 }
1029 writer.commit().await.unwrap();
1030 }
1031
1032 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1033 let pre_merge_segments = index.segment_readers().await.unwrap().len();
1034 assert!(
1035 pre_merge_segments >= 3,
1036 "Expected >=3 segments, got {}",
1037 pre_merge_segments
1038 );
1039 assert_eq!(index.num_docs().await.unwrap(), 50);
1040
1041 let results = index.query("alpha", 100).await.unwrap();
1043 assert_eq!(results.hits.len(), 50, "all 50 docs should match 'alpha'");
1044
1045 let results = index.query("fox", 100).await.unwrap();
1046 assert_eq!(results.hits.len(), 50, "all 50 docs should match 'fox'");
1047
1048 let mut writer = IndexWriter::open(dir.clone(), config.clone())
1050 .await
1051 .unwrap();
1052 writer.force_merge().await.unwrap();
1053
1054 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1055 assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1056 assert_eq!(index.num_docs().await.unwrap(), 50);
1057
1058 let results = index.query("alpha", 100).await.unwrap();
1060 assert_eq!(
1061 results.hits.len(),
1062 50,
1063 "all 50 docs should match 'alpha' after merge 1"
1064 );
1065
1066 let results = index.query("fox", 100).await.unwrap();
1067 assert_eq!(
1068 results.hits.len(),
1069 50,
1070 "all 50 docs should match 'fox' after merge 1"
1071 );
1072
1073 let reader1 = index.reader().await.unwrap();
1075 let searcher1 = reader1.searcher().await.unwrap();
1076 for i in 0..50 {
1077 let doc = searcher1.doc(i).await.unwrap();
1078 assert!(doc.is_some(), "doc {} should exist after merge 1", i);
1079 }
1080
1081 let mut writer = IndexWriter::open(dir.clone(), config.clone())
1083 .await
1084 .unwrap();
1085 for batch in 0..3 {
1086 for i in 0..10 {
1087 let mut doc = Document::new();
1088 doc.add_text(
1089 title,
1090 format!("delta echo foxtrot round2_batch{} doc{}", batch, i),
1091 );
1092 doc.add_text(
1093 body,
1094 format!("the quick brown fox jumps again number {}", i),
1095 );
1096 writer.add_document(doc).unwrap();
1097 }
1098 writer.commit().await.unwrap();
1099 }
1100
1101 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1102 assert_eq!(index.num_docs().await.unwrap(), 80);
1103 assert!(
1104 index.segment_readers().await.unwrap().len() >= 2,
1105 "Should have >=2 segments after round 2 ingestion"
1106 );
1107
1108 let results = index.query("fox", 100).await.unwrap();
1110 assert_eq!(results.hits.len(), 80, "all 80 docs should match 'fox'");
1111
1112 let results = index.query("alpha", 100).await.unwrap();
1113 assert_eq!(results.hits.len(), 50, "only round 1 docs match 'alpha'");
1114
1115 let results = index.query("delta", 100).await.unwrap();
1116 assert_eq!(results.hits.len(), 30, "only round 2 docs match 'delta'");
1117
1118 let mut writer = IndexWriter::open(dir.clone(), config.clone())
1120 .await
1121 .unwrap();
1122 writer.force_merge().await.unwrap();
1123
1124 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1125 assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1126 assert_eq!(index.num_docs().await.unwrap(), 80);
1127
1128 let results = index.query("fox", 100).await.unwrap();
1130 assert_eq!(results.hits.len(), 80, "all 80 docs after merge 2");
1131
1132 let results = index.query("alpha", 100).await.unwrap();
1133 assert_eq!(results.hits.len(), 50, "round 1 docs after merge 2");
1134
1135 let results = index.query("delta", 100).await.unwrap();
1136 assert_eq!(results.hits.len(), 30, "round 2 docs after merge 2");
1137
1138 let reader2 = index.reader().await.unwrap();
1140 let searcher2 = reader2.searcher().await.unwrap();
1141 for i in 0..80 {
1142 let doc = searcher2.doc(i).await.unwrap();
1143 assert!(doc.is_some(), "doc {} should exist after merge 2", i);
1144 }
1145 }
1146
1147 #[tokio::test]
1150 async fn test_large_scale_merge_correctness() {
1151 let mut schema_builder = SchemaBuilder::default();
1152 let title = schema_builder.add_text_field("title", true, true);
1153 let schema = schema_builder.build();
1154
1155 let dir = RamDirectory::new();
1156 let config = IndexConfig {
1157 max_indexing_memory_bytes: 512,
1158 ..Default::default()
1159 };
1160
1161 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1162 .await
1163 .unwrap();
1164
1165 let total_docs = 200u32;
1168 for batch in 0..8 {
1169 for i in 0..25 {
1170 let mut doc = Document::new();
1171 doc.add_text(
1172 title,
1173 format!("common shared term unique_{} item{}", batch, i),
1174 );
1175 writer.add_document(doc).unwrap();
1176 }
1177 writer.commit().await.unwrap();
1178 }
1179
1180 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1182 assert_eq!(index.num_docs().await.unwrap(), total_docs);
1183
1184 let results = index.query("common", 300).await.unwrap();
1185 assert_eq!(
1186 results.hits.len(),
1187 total_docs as usize,
1188 "all docs should match 'common'"
1189 );
1190
1191 for batch in 0..8 {
1193 let q = format!("unique_{}", batch);
1194 let results = index.query(&q, 100).await.unwrap();
1195 assert_eq!(results.hits.len(), 25, "'{}' should match 25 docs", q);
1196 }
1197
1198 let mut writer = IndexWriter::open(dir.clone(), config.clone())
1200 .await
1201 .unwrap();
1202 writer.force_merge().await.unwrap();
1203
1204 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1206 assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1207 assert_eq!(index.num_docs().await.unwrap(), total_docs);
1208
1209 let results = index.query("common", 300).await.unwrap();
1210 assert_eq!(results.hits.len(), total_docs as usize);
1211
1212 for batch in 0..8 {
1213 let q = format!("unique_{}", batch);
1214 let results = index.query(&q, 100).await.unwrap();
1215 assert_eq!(results.hits.len(), 25, "'{}' after merge", q);
1216 }
1217
1218 let reader = index.reader().await.unwrap();
1220 let searcher = reader.searcher().await.unwrap();
1221 for i in 0..total_docs {
1222 let doc = searcher.doc(i).await.unwrap();
1223 assert!(doc.is_some(), "doc {} missing after merge", i);
1224 }
1225 }
1226
1227 #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
1231 async fn test_auto_merge_triggered() {
1232 use crate::directories::MmapDirectory;
1233 let tmp_dir = tempfile::tempdir().unwrap();
1234 let dir = MmapDirectory::new(tmp_dir.path());
1235
1236 let mut schema_builder = SchemaBuilder::default();
1237 let title = schema_builder.add_text_field("title", true, true);
1238 let body = schema_builder.add_text_field("body", true, true);
1239 let schema = schema_builder.build();
1240
1241 let config = IndexConfig {
1243 max_indexing_memory_bytes: 4096,
1244 num_indexing_threads: 4,
1245 merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1246 ..Default::default()
1247 };
1248
1249 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1250 .await
1251 .unwrap();
1252
1253 for batch in 0..12 {
1255 for i in 0..50 {
1256 let mut doc = Document::new();
1257 doc.add_text(title, format!("document_{} batch_{} alpha bravo", i, batch));
1258 doc.add_text(
1259 body,
1260 format!(
1261 "the quick brown fox jumps over lazy dog number {} round {}",
1262 i, batch
1263 ),
1264 );
1265 writer.add_document(doc).unwrap();
1266 }
1267 writer.commit().await.unwrap();
1268 }
1269
1270 let pre_merge = writer.segment_manager.get_segment_ids().await.len();
1271
1272 writer.wait_for_merging_thread().await;
1275 writer.maybe_merge().await;
1276 writer.wait_for_merging_thread().await;
1277
1278 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1280 let segment_count = index.segment_readers().await.unwrap().len();
1281 eprintln!(
1282 "Segments: {} before merge, {} after auto-merge",
1283 pre_merge, segment_count
1284 );
1285 assert!(
1286 segment_count < pre_merge,
1287 "Expected auto-merge to reduce segments from {}, got {}",
1288 pre_merge,
1289 segment_count
1290 );
1291 }
1292
1293 #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
1298 async fn test_commit_with_vectors_and_background_merge() {
1299 use crate::directories::MmapDirectory;
1300 use crate::dsl::DenseVectorConfig;
1301
1302 let tmp_dir = tempfile::tempdir().unwrap();
1303 let dir = MmapDirectory::new(tmp_dir.path());
1304
1305 let mut schema_builder = SchemaBuilder::default();
1306 let title = schema_builder.add_text_field("title", true, true);
1307 let vec_config = DenseVectorConfig::new(8).with_build_threshold(10);
1309 let embedding =
1310 schema_builder.add_dense_vector_field_with_config("embedding", true, true, vec_config);
1311 let schema = schema_builder.build();
1312
1313 let config = IndexConfig {
1315 max_indexing_memory_bytes: 4096,
1316 num_indexing_threads: 4,
1317 merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1318 ..Default::default()
1319 };
1320
1321 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1322 .await
1323 .unwrap();
1324
1325 for batch in 0..12 {
1328 for i in 0..5 {
1329 let mut doc = Document::new();
1330 doc.add_text(title, format!("doc_{}_batch_{}", i, batch));
1331 let vec: Vec<f32> = (0..8).map(|j| (i * 8 + j + batch) as f32 * 0.1).collect();
1333 doc.add_dense_vector(embedding, vec);
1334 writer.add_document(doc).unwrap();
1335 }
1336 writer.commit().await.unwrap();
1337 }
1338 writer.wait_for_merging_thread().await;
1339
1340 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1341 let num_docs = index.num_docs().await.unwrap();
1342 assert_eq!(num_docs, 60, "Expected 60 docs, got {}", num_docs);
1343 }
1344
1345 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1348 async fn test_force_merge_many_segments() {
1349 use crate::directories::MmapDirectory;
1350 let tmp_dir = tempfile::tempdir().unwrap();
1351 let dir = MmapDirectory::new(tmp_dir.path());
1352
1353 let mut schema_builder = SchemaBuilder::default();
1354 let title = schema_builder.add_text_field("title", true, true);
1355 let schema = schema_builder.build();
1356
1357 let config = IndexConfig {
1358 max_indexing_memory_bytes: 512,
1359 ..Default::default()
1360 };
1361
1362 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1363 .await
1364 .unwrap();
1365
1366 for batch in 0..50 {
1368 for i in 0..3 {
1369 let mut doc = Document::new();
1370 doc.add_text(title, format!("term_{} batch_{}", i, batch));
1371 writer.add_document(doc).unwrap();
1372 }
1373 writer.commit().await.unwrap();
1374 }
1375 writer.wait_for_merging_thread().await;
1377
1378 let seg_ids = writer.segment_manager.get_segment_ids().await;
1379 let pre = seg_ids.len();
1380 eprintln!("Segments before force_merge: {}", pre);
1381 assert!(pre >= 2, "Expected multiple segments, got {}", pre);
1382
1383 writer.force_merge().await.unwrap();
1385
1386 let index2 = Index::open(dir, config).await.unwrap();
1387 let post = index2.segment_readers().await.unwrap().len();
1388 eprintln!("Segments after force_merge: {}", post);
1389 assert_eq!(post, 1);
1390 assert_eq!(index2.num_docs().await.unwrap(), 150);
1391 }
1392
1393 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1397 async fn test_background_merge_generation() {
1398 use crate::directories::MmapDirectory;
1399 let tmp_dir = tempfile::tempdir().unwrap();
1400 let dir = MmapDirectory::new(tmp_dir.path());
1401
1402 let mut schema_builder = SchemaBuilder::default();
1403 let title = schema_builder.add_text_field("title", true, true);
1404 let schema = schema_builder.build();
1405
1406 let config = IndexConfig {
1407 max_indexing_memory_bytes: 4096,
1408 num_indexing_threads: 2,
1409 merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1410 ..Default::default()
1411 };
1412
1413 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1414 .await
1415 .unwrap();
1416
1417 for batch in 0..15 {
1419 for i in 0..5 {
1420 let mut doc = Document::new();
1421 doc.add_text(title, format!("doc_{}_batch_{}", i, batch));
1422 writer.add_document(doc).unwrap();
1423 }
1424 writer.commit().await.unwrap();
1425 }
1426 writer.wait_for_merging_thread().await;
1427
1428 let metas = writer
1430 .segment_manager
1431 .read_metadata(|m| m.segment_metas.clone())
1432 .await;
1433
1434 let max_gen = metas.values().map(|m| m.generation).max().unwrap_or(0);
1435 eprintln!(
1436 "Segments after merge: {}, max generation: {}",
1437 metas.len(),
1438 max_gen
1439 );
1440
1441 assert!(
1443 max_gen >= 1,
1444 "Expected at least one merged segment (gen >= 1), got max_gen={}",
1445 max_gen
1446 );
1447
1448 for (id, info) in &metas {
1450 if info.generation > 0 {
1451 assert!(
1452 !info.ancestors.is_empty(),
1453 "Segment {} has gen={} but no ancestors",
1454 id,
1455 info.generation
1456 );
1457 } else {
1458 assert!(
1459 info.ancestors.is_empty(),
1460 "Fresh segment {} has gen=0 but has ancestors",
1461 id
1462 );
1463 }
1464 }
1465 }
1466
1467 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1471 async fn test_merge_preserves_all_documents() {
1472 use crate::directories::MmapDirectory;
1473 let tmp_dir = tempfile::tempdir().unwrap();
1474 let dir = MmapDirectory::new(tmp_dir.path());
1475
1476 let mut schema_builder = SchemaBuilder::default();
1477 let title = schema_builder.add_text_field("title", true, true);
1478 let schema = schema_builder.build();
1479
1480 let config = IndexConfig {
1481 max_indexing_memory_bytes: 4096,
1482 ..Default::default()
1483 };
1484
1485 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1486 .await
1487 .unwrap();
1488
1489 let total_docs = 1200;
1490 let docs_per_batch = 60;
1491 let batches = total_docs / docs_per_batch;
1492
1493 for batch in 0..batches {
1495 for i in 0..docs_per_batch {
1496 let doc_num = batch * docs_per_batch + i;
1497 let mut doc = Document::new();
1498 doc.add_text(
1499 title,
1500 format!("uid_{} common_term batch_{}", doc_num, batch),
1501 );
1502 writer.add_document(doc).unwrap();
1503 }
1504 writer.commit().await.unwrap();
1505 }
1506
1507 let pre_segments = writer.segment_manager.get_segment_ids().await.len();
1508 assert!(
1509 pre_segments >= 2,
1510 "Need multiple segments, got {}",
1511 pre_segments
1512 );
1513
1514 writer.force_merge().await.unwrap();
1516
1517 let index = Index::open(dir, config).await.unwrap();
1518 assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1519 assert_eq!(
1520 index.num_docs().await.unwrap(),
1521 total_docs as u32,
1522 "Doc count mismatch after force_merge"
1523 );
1524
1525 let results = index.query("common_term", total_docs + 100).await.unwrap();
1527 assert_eq!(
1528 results.hits.len(),
1529 total_docs,
1530 "common_term should match all docs"
1531 );
1532
1533 for check in [0, 1, total_docs / 2, total_docs - 1] {
1535 let q = format!("uid_{}", check);
1536 let results = index.query(&q, 10).await.unwrap();
1537 assert_eq!(results.hits.len(), 1, "'{}' should match exactly 1 doc", q);
1538 }
1539 }
1540
1541 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1544 async fn test_multi_round_merge_doc_integrity() {
1545 use crate::directories::MmapDirectory;
1546 let tmp_dir = tempfile::tempdir().unwrap();
1547 let dir = MmapDirectory::new(tmp_dir.path());
1548
1549 let mut schema_builder = SchemaBuilder::default();
1550 let title = schema_builder.add_text_field("title", true, true);
1551 let schema = schema_builder.build();
1552
1553 let config = IndexConfig {
1554 max_indexing_memory_bytes: 4096,
1555 num_indexing_threads: 2,
1556 merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1557 ..Default::default()
1558 };
1559
1560 let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1561 .await
1562 .unwrap();
1563
1564 let mut expected_total = 0u64;
1565
1566 for round in 0..4 {
1568 let docs_this_round = 50 + round * 25; for batch in 0..5 {
1570 for i in 0..docs_this_round / 5 {
1571 let mut doc = Document::new();
1572 doc.add_text(
1573 title,
1574 format!("round_{}_batch_{}_doc_{} searchable", round, batch, i),
1575 );
1576 writer.add_document(doc).unwrap();
1577 }
1578 writer.commit().await.unwrap();
1579 }
1580 writer.wait_for_merging_thread().await;
1581
1582 expected_total += docs_this_round as u64;
1583
1584 let actual = writer
1585 .segment_manager
1586 .read_metadata(|m| {
1587 m.segment_metas
1588 .values()
1589 .map(|s| s.num_docs as u64)
1590 .sum::<u64>()
1591 })
1592 .await;
1593
1594 assert_eq!(
1595 actual, expected_total,
1596 "Round {}: expected {} docs, metadata reports {}",
1597 round, expected_total, actual
1598 );
1599 }
1600
1601 let index = Index::open(dir, config).await.unwrap();
1603 assert_eq!(index.num_docs().await.unwrap(), expected_total as u32);
1604
1605 let results = index
1606 .query("searchable", expected_total as usize + 100)
1607 .await
1608 .unwrap();
1609 assert_eq!(
1610 results.hits.len(),
1611 expected_total as usize,
1612 "All docs should match 'searchable'"
1613 );
1614
1615 let metas = index
1617 .segment_manager()
1618 .read_metadata(|m| m.segment_metas.clone())
1619 .await;
1620 let max_gen = metas.values().map(|m| m.generation).max().unwrap_or(0);
1621 eprintln!(
1622 "Final: {} segments, {} docs, max generation={}",
1623 metas.len(),
1624 expected_total,
1625 max_gen
1626 );
1627 assert!(
1628 max_gen >= 1,
1629 "Multiple merge rounds should produce gen >= 1"
1630 );
1631 }
1632}