Skip to main content

hermes_core/index/
mod.rs

1//! Index - multi-segment async search index
2//!
3//! The `Index` is the central concept that provides:
4//! - `Index::create()` / `Index::open()` - create or open an index
5//! - `index.writer()` - get an IndexWriter for adding documents
6//! - `index.reader()` - get an IndexReader for searching (with reload policy)
7//!
8//! The Index owns the SegmentManager which handles segment lifecycle and tracking.
9
10#[cfg(feature = "native")]
11use crate::dsl::Schema;
12#[cfg(feature = "native")]
13use crate::error::Result;
14#[cfg(feature = "native")]
15use std::sync::Arc;
16
17mod searcher;
18pub use searcher::Searcher;
19
20#[cfg(feature = "native")]
21mod reader;
22#[cfg(feature = "native")]
23mod vector_builder;
24#[cfg(feature = "native")]
25mod writer;
26#[cfg(feature = "native")]
27pub use reader::IndexReader;
28#[cfg(feature = "native")]
29pub use writer::IndexWriter;
30
31mod metadata;
32pub use metadata::{FieldVectorMeta, INDEX_META_FILENAME, IndexMetadata, VectorIndexState};
33
34#[cfg(feature = "native")]
35mod helpers;
36#[cfg(feature = "native")]
37pub use helpers::{
38    IndexingStats, SchemaConfig, SchemaFieldConfig, create_index_at_path, create_index_from_sdl,
39    index_documents_from_reader, index_json_document, parse_schema,
40};
41
42/// Default file name for the slice cache
43pub const SLICE_CACHE_FILENAME: &str = "index.slicecache";
44
45/// Index configuration
46#[derive(Debug, Clone)]
47pub struct IndexConfig {
48    /// Number of threads for CPU-intensive tasks (search parallelism)
49    pub num_threads: usize,
50    /// Number of parallel segment builders (documents distributed round-robin)
51    pub num_indexing_threads: usize,
52    /// Number of threads for parallel block compression within each segment
53    pub num_compression_threads: usize,
54    /// Block cache size for term dictionary per segment
55    pub term_cache_blocks: usize,
56    /// Block cache size for document store per segment
57    pub store_cache_blocks: usize,
58    /// Max memory (bytes) across all builders before auto-commit (global limit)
59    pub max_indexing_memory_bytes: usize,
60    /// Merge policy for background segment merging
61    pub merge_policy: Box<dyn crate::merge::MergePolicy>,
62    /// Index optimization mode (adaptive, size-optimized, performance-optimized)
63    pub optimization: crate::structures::IndexOptimization,
64    /// Reload interval in milliseconds for IndexReader (how often to check for new segments)
65    pub reload_interval_ms: u64,
66}
67
68impl Default for IndexConfig {
69    fn default() -> Self {
70        #[cfg(feature = "native")]
71        let cpus = num_cpus::get().max(1);
72        #[cfg(not(feature = "native"))]
73        let cpus = 1;
74
75        Self {
76            num_threads: cpus,
77            num_indexing_threads: 1,
78            num_compression_threads: cpus,
79            term_cache_blocks: 256,
80            store_cache_blocks: 32,
81            max_indexing_memory_bytes: 256 * 1024 * 1024, // 256 MB default
82            merge_policy: Box::new(crate::merge::TieredMergePolicy::default()),
83            optimization: crate::structures::IndexOptimization::default(),
84            reload_interval_ms: 1000, // 1 second default
85        }
86    }
87}
88
89/// Multi-segment async Index
90///
91/// The central concept for search. Owns segment lifecycle and provides:
92/// - `Index::create()` / `Index::open()` - create or open an index
93/// - `index.writer()` - get an IndexWriter for adding documents
94/// - `index.reader()` - get an IndexReader for searching with reload policy
95///
96/// All segment management is delegated to SegmentManager.
97#[cfg(feature = "native")]
98pub struct Index<D: crate::directories::DirectoryWriter + 'static> {
99    directory: Arc<D>,
100    schema: Arc<Schema>,
101    config: IndexConfig,
102    /// Segment manager - owns segments, tracker, metadata, and trained structures
103    segment_manager: Arc<crate::merge::SegmentManager<D>>,
104    /// Cached reader (created lazily, reused across calls)
105    cached_reader: tokio::sync::OnceCell<IndexReader<D>>,
106}
107
108#[cfg(feature = "native")]
109impl<D: crate::directories::DirectoryWriter + 'static> Index<D> {
110    /// Create a new index in the directory
111    pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
112        let directory = Arc::new(directory);
113        let schema = Arc::new(schema);
114        let metadata = IndexMetadata::new((*schema).clone());
115
116        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
117            Arc::clone(&directory),
118            Arc::clone(&schema),
119            metadata,
120            config.merge_policy.clone_box(),
121            config.term_cache_blocks,
122        ));
123
124        // Save initial metadata
125        segment_manager.update_metadata(|_| {}).await?;
126
127        Ok(Self {
128            directory,
129            schema,
130            config,
131            segment_manager,
132            cached_reader: tokio::sync::OnceCell::new(),
133        })
134    }
135
136    /// Open an existing index from a directory
137    pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
138        let directory = Arc::new(directory);
139
140        // Load metadata (includes schema)
141        let metadata = IndexMetadata::load(directory.as_ref()).await?;
142        let schema = Arc::new(metadata.schema.clone());
143
144        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
145            Arc::clone(&directory),
146            Arc::clone(&schema),
147            metadata,
148            config.merge_policy.clone_box(),
149            config.term_cache_blocks,
150        ));
151
152        // Load trained structures into SegmentManager's ArcSwap
153        segment_manager.load_and_publish_trained().await;
154
155        Ok(Self {
156            directory,
157            schema,
158            config,
159            segment_manager,
160            cached_reader: tokio::sync::OnceCell::new(),
161        })
162    }
163
164    /// Get the schema
165    pub fn schema(&self) -> &Schema {
166        &self.schema
167    }
168
169    /// Get a reference to the underlying directory
170    pub fn directory(&self) -> &D {
171        &self.directory
172    }
173
174    /// Get the segment manager
175    pub fn segment_manager(&self) -> &Arc<crate::merge::SegmentManager<D>> {
176        &self.segment_manager
177    }
178
179    /// Get an IndexReader for searching (with reload policy)
180    ///
181    /// The reader is cached and reused across calls. The reader's internal
182    /// searcher will reload segments based on its reload interval (configurable via IndexConfig).
183    pub async fn reader(&self) -> Result<&IndexReader<D>> {
184        self.cached_reader
185            .get_or_try_init(|| async {
186                IndexReader::from_segment_manager(
187                    Arc::clone(&self.schema),
188                    Arc::clone(&self.segment_manager),
189                    self.config.term_cache_blocks,
190                    self.config.reload_interval_ms,
191                )
192                .await
193            })
194            .await
195    }
196
197    /// Get the config
198    pub fn config(&self) -> &IndexConfig {
199        &self.config
200    }
201
202    /// Get segment readers for query execution (convenience method)
203    pub async fn segment_readers(&self) -> Result<Vec<Arc<crate::segment::SegmentReader>>> {
204        let reader = self.reader().await?;
205        let searcher = reader.searcher().await?;
206        Ok(searcher.segment_readers().to_vec())
207    }
208
209    /// Total number of documents across all segments
210    pub async fn num_docs(&self) -> Result<u32> {
211        let reader = self.reader().await?;
212        let searcher = reader.searcher().await?;
213        Ok(searcher.num_docs())
214    }
215
216    /// Get default fields for search
217    pub fn default_fields(&self) -> Vec<crate::Field> {
218        if !self.schema.default_fields().is_empty() {
219            self.schema.default_fields().to_vec()
220        } else {
221            self.schema
222                .fields()
223                .filter(|(_, entry)| {
224                    entry.indexed && entry.field_type == crate::dsl::FieldType::Text
225                })
226                .map(|(field, _)| field)
227                .collect()
228        }
229    }
230
231    /// Get tokenizer registry
232    pub fn tokenizers(&self) -> Arc<crate::tokenizer::TokenizerRegistry> {
233        Arc::new(crate::tokenizer::TokenizerRegistry::default())
234    }
235
236    /// Create a query parser for this index
237    pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
238        let default_fields = self.default_fields();
239        let tokenizers = self.tokenizers();
240
241        let query_routers = self.schema.query_routers();
242        if !query_routers.is_empty()
243            && let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers)
244        {
245            return crate::dsl::QueryLanguageParser::with_router(
246                Arc::clone(&self.schema),
247                default_fields,
248                tokenizers,
249                router,
250            );
251        }
252
253        crate::dsl::QueryLanguageParser::new(Arc::clone(&self.schema), default_fields, tokenizers)
254    }
255
256    /// Parse and search using a query string
257    pub async fn query(
258        &self,
259        query_str: &str,
260        limit: usize,
261    ) -> Result<crate::query::SearchResponse> {
262        self.query_offset(query_str, limit, 0).await
263    }
264
265    /// Query with offset for pagination
266    pub async fn query_offset(
267        &self,
268        query_str: &str,
269        limit: usize,
270        offset: usize,
271    ) -> Result<crate::query::SearchResponse> {
272        let parser = self.query_parser();
273        let query = parser
274            .parse(query_str)
275            .map_err(crate::error::Error::Query)?;
276        self.search_offset(query.as_ref(), limit, offset).await
277    }
278
279    /// Search and return results
280    pub async fn search(
281        &self,
282        query: &dyn crate::query::Query,
283        limit: usize,
284    ) -> Result<crate::query::SearchResponse> {
285        self.search_offset(query, limit, 0).await
286    }
287
288    /// Search with offset for pagination
289    pub async fn search_offset(
290        &self,
291        query: &dyn crate::query::Query,
292        limit: usize,
293        offset: usize,
294    ) -> Result<crate::query::SearchResponse> {
295        let reader = self.reader().await?;
296        let searcher = reader.searcher().await?;
297        let segments = searcher.segment_readers();
298
299        let fetch_limit = offset + limit;
300
301        let futures: Vec<_> = segments
302            .iter()
303            .map(|segment| {
304                let sid = segment.meta().id;
305                async move {
306                    let results =
307                        crate::query::search_segment(segment.as_ref(), query, fetch_limit).await?;
308                    Ok::<_, crate::error::Error>(
309                        results
310                            .into_iter()
311                            .map(move |r| (sid, r))
312                            .collect::<Vec<_>>(),
313                    )
314                }
315            })
316            .collect();
317
318        let batches = futures::future::try_join_all(futures).await?;
319        let mut all_results: Vec<(u128, crate::query::SearchResult)> =
320            Vec::with_capacity(batches.iter().map(|b| b.len()).sum());
321        for batch in batches {
322            all_results.extend(batch);
323        }
324
325        all_results.sort_by(|a, b| {
326            b.1.score
327                .partial_cmp(&a.1.score)
328                .unwrap_or(std::cmp::Ordering::Equal)
329        });
330
331        let total_hits = all_results.len() as u32;
332
333        let hits: Vec<crate::query::SearchHit> = all_results
334            .into_iter()
335            .skip(offset)
336            .take(limit)
337            .map(|(segment_id, result)| crate::query::SearchHit {
338                address: crate::query::DocAddress::new(segment_id, result.doc_id),
339                score: result.score,
340                matched_fields: result.extract_ordinals(),
341            })
342            .collect();
343
344        Ok(crate::query::SearchResponse { hits, total_hits })
345    }
346
347    /// Get a document by its unique address
348    pub async fn get_document(
349        &self,
350        address: &crate::query::DocAddress,
351    ) -> Result<Option<crate::dsl::Document>> {
352        let reader = self.reader().await?;
353        let searcher = reader.searcher().await?;
354        searcher.get_document(address).await
355    }
356
357    /// Reload is no longer needed - reader handles this automatically
358    pub async fn reload(&self) -> Result<()> {
359        // No-op - reader reloads automatically based on policy
360        Ok(())
361    }
362
363    /// Get posting lists for a term across all segments
364    pub async fn get_postings(
365        &self,
366        field: crate::Field,
367        term: &[u8],
368    ) -> Result<
369        Vec<(
370            Arc<crate::segment::SegmentReader>,
371            crate::structures::BlockPostingList,
372        )>,
373    > {
374        let segments = self.segment_readers().await?;
375        let mut results = Vec::new();
376
377        for segment in segments {
378            if let Some(postings) = segment.get_postings(field, term).await? {
379                results.push((segment, postings));
380            }
381        }
382
383        Ok(results)
384    }
385}
386
387/// Native-only methods for Index
388#[cfg(feature = "native")]
389impl<D: crate::directories::DirectoryWriter + 'static> Index<D> {
390    /// Get an IndexWriter for adding documents
391    pub fn writer(&self) -> writer::IndexWriter<D> {
392        writer::IndexWriter::from_index(self)
393    }
394}
395
396// TODO: Add back warmup_and_save_slice_cache when slice caching is re-integrated
397
398#[cfg(test)]
399mod tests {
400    use super::*;
401    use crate::directories::RamDirectory;
402    use crate::dsl::{Document, SchemaBuilder};
403
404    #[tokio::test]
405    async fn test_index_create_and_search() {
406        let mut schema_builder = SchemaBuilder::default();
407        let title = schema_builder.add_text_field("title", true, true);
408        let body = schema_builder.add_text_field("body", true, true);
409        let schema = schema_builder.build();
410
411        let dir = RamDirectory::new();
412        let config = IndexConfig::default();
413
414        // Create index and add documents
415        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
416            .await
417            .unwrap();
418
419        let mut doc1 = Document::new();
420        doc1.add_text(title, "Hello World");
421        doc1.add_text(body, "This is the first document");
422        writer.add_document(doc1).unwrap();
423
424        let mut doc2 = Document::new();
425        doc2.add_text(title, "Goodbye World");
426        doc2.add_text(body, "This is the second document");
427        writer.add_document(doc2).unwrap();
428
429        writer.commit().await.unwrap();
430
431        // Open for reading
432        let index = Index::open(dir, config).await.unwrap();
433        assert_eq!(index.num_docs().await.unwrap(), 2);
434
435        // Check postings
436        let postings = index.get_postings(title, b"world").await.unwrap();
437        assert_eq!(postings.len(), 1); // One segment
438        assert_eq!(postings[0].1.doc_count(), 2); // Two docs with "world"
439
440        // Retrieve document via searcher snapshot
441        let reader = index.reader().await.unwrap();
442        let searcher = reader.searcher().await.unwrap();
443        let doc = searcher.doc(0).await.unwrap().unwrap();
444        assert_eq!(doc.get_first(title).unwrap().as_text(), Some("Hello World"));
445    }
446
447    #[tokio::test]
448    async fn test_multiple_segments() {
449        let mut schema_builder = SchemaBuilder::default();
450        let title = schema_builder.add_text_field("title", true, true);
451        let schema = schema_builder.build();
452
453        let dir = RamDirectory::new();
454        let config = IndexConfig {
455            max_indexing_memory_bytes: 1024, // Very small to trigger frequent flushes
456            ..Default::default()
457        };
458
459        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
460            .await
461            .unwrap();
462
463        // Add documents in batches to create multiple segments
464        for batch in 0..3 {
465            for i in 0..5 {
466                let mut doc = Document::new();
467                doc.add_text(title, format!("Document {} batch {}", i, batch));
468                writer.add_document(doc).unwrap();
469            }
470            writer.commit().await.unwrap();
471        }
472
473        // Open and check
474        let index = Index::open(dir, config).await.unwrap();
475        assert_eq!(index.num_docs().await.unwrap(), 15);
476        // With queue-based indexing, exact segment count varies
477        assert!(
478            index.segment_readers().await.unwrap().len() >= 2,
479            "Expected multiple segments"
480        );
481    }
482
483    #[tokio::test]
484    async fn test_segment_merge() {
485        let mut schema_builder = SchemaBuilder::default();
486        let title = schema_builder.add_text_field("title", true, true);
487        let schema = schema_builder.build();
488
489        let dir = RamDirectory::new();
490        let config = IndexConfig {
491            max_indexing_memory_bytes: 512, // Very small to trigger frequent flushes
492            ..Default::default()
493        };
494
495        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
496            .await
497            .unwrap();
498
499        // Create multiple segments by flushing between batches
500        for batch in 0..3 {
501            for i in 0..3 {
502                let mut doc = Document::new();
503                doc.add_text(title, format!("Document {} batch {}", i, batch));
504                writer.add_document(doc).unwrap();
505            }
506            writer.flush().await.unwrap();
507        }
508        writer.commit().await.unwrap();
509
510        // Should have multiple segments (at least 2, one per flush with docs)
511        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
512        assert!(
513            index.segment_readers().await.unwrap().len() >= 2,
514            "Expected multiple segments"
515        );
516
517        // Force merge
518        let writer = IndexWriter::open(dir.clone(), config.clone())
519            .await
520            .unwrap();
521        writer.force_merge().await.unwrap();
522
523        // Should have 1 segment now
524        let index = Index::open(dir, config).await.unwrap();
525        assert_eq!(index.segment_readers().await.unwrap().len(), 1);
526        assert_eq!(index.num_docs().await.unwrap(), 9);
527
528        // Verify all documents accessible (order may vary with queue-based indexing)
529        let reader = index.reader().await.unwrap();
530        let searcher = reader.searcher().await.unwrap();
531        let mut found_docs = 0;
532        for i in 0..9 {
533            if searcher.doc(i).await.unwrap().is_some() {
534                found_docs += 1;
535            }
536        }
537        assert_eq!(found_docs, 9);
538    }
539
540    #[tokio::test]
541    async fn test_match_query() {
542        let mut schema_builder = SchemaBuilder::default();
543        let title = schema_builder.add_text_field("title", true, true);
544        let body = schema_builder.add_text_field("body", true, true);
545        let schema = schema_builder.build();
546
547        let dir = RamDirectory::new();
548        let config = IndexConfig::default();
549
550        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
551            .await
552            .unwrap();
553
554        let mut doc1 = Document::new();
555        doc1.add_text(title, "rust programming");
556        doc1.add_text(body, "Learn rust language");
557        writer.add_document(doc1).unwrap();
558
559        let mut doc2 = Document::new();
560        doc2.add_text(title, "python programming");
561        doc2.add_text(body, "Learn python language");
562        writer.add_document(doc2).unwrap();
563
564        writer.commit().await.unwrap();
565
566        let index = Index::open(dir, config).await.unwrap();
567
568        // Test match query with multiple default fields
569        let results = index.query("rust", 10).await.unwrap();
570        assert_eq!(results.hits.len(), 1);
571
572        // Test match query with multiple tokens
573        let results = index.query("rust programming", 10).await.unwrap();
574        assert!(!results.hits.is_empty());
575
576        // Verify hit has address (segment_id + doc_id)
577        let hit = &results.hits[0];
578        assert!(!hit.address.segment_id.is_empty(), "Should have segment_id");
579
580        // Verify document retrieval by address
581        let doc = index.get_document(&hit.address).await.unwrap().unwrap();
582        assert!(
583            !doc.field_values().is_empty(),
584            "Doc should have field values"
585        );
586
587        // Also verify doc retrieval via searcher snapshot
588        let reader = index.reader().await.unwrap();
589        let searcher = reader.searcher().await.unwrap();
590        let doc = searcher.doc(0).await.unwrap().unwrap();
591        assert!(
592            !doc.field_values().is_empty(),
593            "Doc should have field values"
594        );
595    }
596
597    #[tokio::test]
598    async fn test_slice_cache_warmup_and_load() {
599        use crate::directories::SliceCachingDirectory;
600
601        let mut schema_builder = SchemaBuilder::default();
602        let title = schema_builder.add_text_field("title", true, true);
603        let body = schema_builder.add_text_field("body", true, true);
604        let schema = schema_builder.build();
605
606        let dir = RamDirectory::new();
607        let config = IndexConfig::default();
608
609        // Create index with some documents
610        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
611            .await
612            .unwrap();
613
614        for i in 0..10 {
615            let mut doc = Document::new();
616            doc.add_text(title, format!("Document {} about rust", i));
617            doc.add_text(body, format!("This is body text number {}", i));
618            writer.add_document(doc).unwrap();
619        }
620        writer.commit().await.unwrap();
621
622        // Open with slice caching and perform some operations to warm up cache
623        let caching_dir = SliceCachingDirectory::new(dir.clone(), 1024 * 1024);
624        let index = Index::open(caching_dir, config.clone()).await.unwrap();
625
626        // Perform a search to warm up the cache
627        let results = index.query("rust", 10).await.unwrap();
628        assert!(!results.hits.is_empty());
629
630        // Check cache stats - should have cached some data
631        let stats = index.directory.stats();
632        assert!(stats.total_bytes > 0, "Cache should have data after search");
633    }
634
635    #[tokio::test]
636    async fn test_multivalue_field_indexing_and_search() {
637        let mut schema_builder = SchemaBuilder::default();
638        let uris = schema_builder.add_text_field("uris", true, true);
639        let title = schema_builder.add_text_field("title", true, true);
640        let schema = schema_builder.build();
641
642        let dir = RamDirectory::new();
643        let config = IndexConfig::default();
644
645        // Create index and add document with multi-value field
646        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
647            .await
648            .unwrap();
649
650        let mut doc = Document::new();
651        doc.add_text(uris, "one");
652        doc.add_text(uris, "two");
653        doc.add_text(title, "Test Document");
654        writer.add_document(doc).unwrap();
655
656        // Add another document with different uris
657        let mut doc2 = Document::new();
658        doc2.add_text(uris, "three");
659        doc2.add_text(title, "Another Document");
660        writer.add_document(doc2).unwrap();
661
662        writer.commit().await.unwrap();
663
664        // Open for reading
665        let index = Index::open(dir, config).await.unwrap();
666        assert_eq!(index.num_docs().await.unwrap(), 2);
667
668        // Verify document retrieval preserves all values
669        let reader = index.reader().await.unwrap();
670        let searcher = reader.searcher().await.unwrap();
671        let doc = searcher.doc(0).await.unwrap().unwrap();
672        let all_uris: Vec<_> = doc.get_all(uris).collect();
673        assert_eq!(all_uris.len(), 2, "Should have 2 uris values");
674        assert_eq!(all_uris[0].as_text(), Some("one"));
675        assert_eq!(all_uris[1].as_text(), Some("two"));
676
677        // Verify to_json returns array for multi-value field
678        let json = doc.to_json(index.schema());
679        let uris_json = json.get("uris").unwrap();
680        assert!(uris_json.is_array(), "Multi-value field should be an array");
681        let uris_arr = uris_json.as_array().unwrap();
682        assert_eq!(uris_arr.len(), 2);
683        assert_eq!(uris_arr[0].as_str(), Some("one"));
684        assert_eq!(uris_arr[1].as_str(), Some("two"));
685
686        // Verify both values are searchable
687        let results = index.query("uris:one", 10).await.unwrap();
688        assert_eq!(results.hits.len(), 1, "Should find doc with 'one'");
689        assert_eq!(results.hits[0].address.doc_id, 0);
690
691        let results = index.query("uris:two", 10).await.unwrap();
692        assert_eq!(results.hits.len(), 1, "Should find doc with 'two'");
693        assert_eq!(results.hits[0].address.doc_id, 0);
694
695        let results = index.query("uris:three", 10).await.unwrap();
696        assert_eq!(results.hits.len(), 1, "Should find doc with 'three'");
697        assert_eq!(results.hits[0].address.doc_id, 1);
698
699        // Verify searching for non-existent value returns no results
700        let results = index.query("uris:nonexistent", 10).await.unwrap();
701        assert_eq!(results.hits.len(), 0, "Should not find non-existent value");
702    }
703
704    /// Comprehensive test for WAND optimization in BooleanQuery OR queries
705    ///
706    /// This test verifies that:
707    /// 1. BooleanQuery with multiple SHOULD term queries uses WAND automatically
708    /// 2. Search results are correct regardless of WAND optimization
709    /// 3. Scores are reasonable for matching documents
710    #[tokio::test]
711    async fn test_wand_optimization_for_or_queries() {
712        use crate::query::{BooleanQuery, TermQuery};
713
714        let mut schema_builder = SchemaBuilder::default();
715        let content = schema_builder.add_text_field("content", true, true);
716        let schema = schema_builder.build();
717
718        let dir = RamDirectory::new();
719        let config = IndexConfig::default();
720
721        // Create index with documents containing various terms
722        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
723            .await
724            .unwrap();
725
726        // Doc 0: contains "rust" and "programming"
727        let mut doc = Document::new();
728        doc.add_text(content, "rust programming language is fast");
729        writer.add_document(doc).unwrap();
730
731        // Doc 1: contains "rust" only
732        let mut doc = Document::new();
733        doc.add_text(content, "rust is a systems language");
734        writer.add_document(doc).unwrap();
735
736        // Doc 2: contains "programming" only
737        let mut doc = Document::new();
738        doc.add_text(content, "programming is fun");
739        writer.add_document(doc).unwrap();
740
741        // Doc 3: contains "python" (neither rust nor programming)
742        let mut doc = Document::new();
743        doc.add_text(content, "python is easy to learn");
744        writer.add_document(doc).unwrap();
745
746        // Doc 4: contains both "rust" and "programming" multiple times
747        let mut doc = Document::new();
748        doc.add_text(content, "rust rust programming programming systems");
749        writer.add_document(doc).unwrap();
750
751        writer.commit().await.unwrap();
752
753        // Open for reading
754        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
755
756        // Test 1: Pure OR query with multiple terms (should use WAND automatically)
757        let or_query = BooleanQuery::new()
758            .should(TermQuery::text(content, "rust"))
759            .should(TermQuery::text(content, "programming"));
760
761        let results = index.search(&or_query, 10).await.unwrap();
762
763        // Should find docs 0, 1, 2, 4 (all that contain "rust" OR "programming")
764        assert_eq!(results.hits.len(), 4, "Should find exactly 4 documents");
765
766        let doc_ids: Vec<u32> = results.hits.iter().map(|h| h.address.doc_id).collect();
767        assert!(doc_ids.contains(&0), "Should find doc 0");
768        assert!(doc_ids.contains(&1), "Should find doc 1");
769        assert!(doc_ids.contains(&2), "Should find doc 2");
770        assert!(doc_ids.contains(&4), "Should find doc 4");
771        assert!(
772            !doc_ids.contains(&3),
773            "Should NOT find doc 3 (only has 'python')"
774        );
775
776        // Test 2: Single term query (should NOT use WAND, but still work)
777        let single_query = BooleanQuery::new().should(TermQuery::text(content, "rust"));
778
779        let results = index.search(&single_query, 10).await.unwrap();
780        assert_eq!(results.hits.len(), 3, "Should find 3 documents with 'rust'");
781
782        // Test 3: Query with MUST (should NOT use WAND)
783        let must_query = BooleanQuery::new()
784            .must(TermQuery::text(content, "rust"))
785            .should(TermQuery::text(content, "programming"));
786
787        let results = index.search(&must_query, 10).await.unwrap();
788        // Must have "rust", optionally "programming"
789        assert_eq!(results.hits.len(), 3, "Should find 3 documents with 'rust'");
790
791        // Test 4: Query with MUST_NOT (should NOT use WAND)
792        let must_not_query = BooleanQuery::new()
793            .should(TermQuery::text(content, "rust"))
794            .should(TermQuery::text(content, "programming"))
795            .must_not(TermQuery::text(content, "systems"));
796
797        let results = index.search(&must_not_query, 10).await.unwrap();
798        // Should exclude docs with "systems" (doc 1 and 4)
799        let doc_ids: Vec<u32> = results.hits.iter().map(|h| h.address.doc_id).collect();
800        assert!(
801            !doc_ids.contains(&1),
802            "Should NOT find doc 1 (has 'systems')"
803        );
804        assert!(
805            !doc_ids.contains(&4),
806            "Should NOT find doc 4 (has 'systems')"
807        );
808
809        // Test 5: Verify top-k limit works correctly with WAND
810        let or_query = BooleanQuery::new()
811            .should(TermQuery::text(content, "rust"))
812            .should(TermQuery::text(content, "programming"));
813
814        let results = index.search(&or_query, 2).await.unwrap();
815        assert_eq!(results.hits.len(), 2, "Should return only top 2 results");
816
817        // Top results should be docs that match both terms (higher scores)
818        // Doc 0 and 4 contain both "rust" and "programming"
819    }
820
821    /// Test that WAND optimization produces same results as non-WAND for correctness
822    #[tokio::test]
823    async fn test_wand_results_match_standard_boolean() {
824        use crate::query::{BooleanQuery, TermQuery, WandOrQuery};
825
826        let mut schema_builder = SchemaBuilder::default();
827        let content = schema_builder.add_text_field("content", true, true);
828        let schema = schema_builder.build();
829
830        let dir = RamDirectory::new();
831        let config = IndexConfig::default();
832
833        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
834            .await
835            .unwrap();
836
837        // Add several documents
838        for i in 0..10 {
839            let mut doc = Document::new();
840            let text = match i % 4 {
841                0 => "apple banana cherry",
842                1 => "apple orange",
843                2 => "banana grape",
844                _ => "cherry date",
845            };
846            doc.add_text(content, text);
847            writer.add_document(doc).unwrap();
848        }
849
850        writer.commit().await.unwrap();
851        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
852
853        // Compare explicit WandOrQuery with auto-optimized BooleanQuery
854        let wand_query = WandOrQuery::new(content).term("apple").term("banana");
855
856        let bool_query = BooleanQuery::new()
857            .should(TermQuery::text(content, "apple"))
858            .should(TermQuery::text(content, "banana"));
859
860        let wand_results = index.search(&wand_query, 10).await.unwrap();
861        let bool_results = index.search(&bool_query, 10).await.unwrap();
862
863        // Both should find the same documents
864        assert_eq!(
865            wand_results.hits.len(),
866            bool_results.hits.len(),
867            "WAND and Boolean should find same number of docs"
868        );
869
870        let wand_docs: std::collections::HashSet<u32> =
871            wand_results.hits.iter().map(|h| h.address.doc_id).collect();
872        let bool_docs: std::collections::HashSet<u32> =
873            bool_results.hits.iter().map(|h| h.address.doc_id).collect();
874
875        assert_eq!(
876            wand_docs, bool_docs,
877            "WAND and Boolean should find same documents"
878        );
879    }
880
881    #[tokio::test]
882    async fn test_vector_index_threshold_switch() {
883        use crate::dsl::{DenseVectorConfig, DenseVectorQuantization, VectorIndexType};
884
885        // Create schema with dense vector field configured for IVF-RaBitQ
886        let mut schema_builder = SchemaBuilder::default();
887        let title = schema_builder.add_text_field("title", true, true);
888        let embedding = schema_builder.add_dense_vector_field_with_config(
889            "embedding",
890            true, // indexed
891            true, // stored
892            DenseVectorConfig {
893                dim: 8,
894                index_type: VectorIndexType::IvfRaBitQ,
895                quantization: DenseVectorQuantization::F32,
896                num_clusters: Some(4), // Small for test
897                nprobe: 2,
898                build_threshold: Some(50), // Build when we have 50+ vectors
899            },
900        );
901        let schema = schema_builder.build();
902
903        let dir = RamDirectory::new();
904        let config = IndexConfig::default();
905
906        // Phase 1: Add vectors below threshold (should use Flat index)
907        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
908            .await
909            .unwrap();
910
911        // Add 30 documents (below threshold of 50)
912        for i in 0..30 {
913            let mut doc = Document::new();
914            doc.add_text(title, format!("Document {}", i));
915            // Simple embedding: [i, i, i, i, i, i, i, i] normalized
916            let vec: Vec<f32> = (0..8).map(|_| (i as f32) / 30.0).collect();
917            doc.add_dense_vector(embedding, vec);
918            writer.add_document(doc).unwrap();
919        }
920        writer.commit().await.unwrap();
921
922        // Open index and verify it's using Flat (not built yet)
923        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
924        assert!(
925            index.segment_manager.trained().is_none(),
926            "Should not have trained centroids below threshold"
927        );
928
929        // Search should work with Flat index
930        let query_vec: Vec<f32> = vec![0.5; 8];
931        let segments = index.segment_readers().await.unwrap();
932        assert!(!segments.is_empty());
933
934        let results = segments[0]
935            .search_dense_vector(
936                embedding,
937                &query_vec,
938                5,
939                0,
940                1,
941                crate::query::MultiValueCombiner::Max,
942            )
943            .await
944            .unwrap();
945        assert!(!results.is_empty(), "Flat search should return results");
946
947        // Phase 2: Add more vectors to cross threshold
948        let writer = IndexWriter::open(dir.clone(), config.clone())
949            .await
950            .unwrap();
951
952        // Add 30 more documents (total 60, above threshold of 50)
953        for i in 30..60 {
954            let mut doc = Document::new();
955            doc.add_text(title, format!("Document {}", i));
956            let vec: Vec<f32> = (0..8).map(|_| (i as f32) / 60.0).collect();
957            doc.add_dense_vector(embedding, vec);
958            writer.add_document(doc).unwrap();
959        }
960        writer.commit().await.unwrap();
961
962        // Manually trigger vector index build (no longer auto-triggered by commit)
963        writer.build_vector_index().await.unwrap();
964
965        // Reopen index and verify trained structures are loaded
966        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
967        assert!(
968            index.segment_manager.trained().is_some(),
969            "Should have loaded trained centroids for embedding field"
970        );
971
972        // Search should still work
973        let segments = index.segment_readers().await.unwrap();
974        let results = segments[0]
975            .search_dense_vector(
976                embedding,
977                &query_vec,
978                5,
979                0,
980                1,
981                crate::query::MultiValueCombiner::Max,
982            )
983            .await
984            .unwrap();
985        assert!(
986            !results.is_empty(),
987            "Search should return results after build"
988        );
989
990        // Phase 3: Verify calling build_vector_index again is a no-op
991        let writer = IndexWriter::open(dir.clone(), config.clone())
992            .await
993            .unwrap();
994        writer.build_vector_index().await.unwrap(); // Should skip training
995
996        // Still built (trained structures present in ArcSwap)
997        assert!(writer.segment_manager.trained().is_some());
998    }
999
1000    /// Multi-round merge: flush many small segments, merge, add more, merge again.
1001    /// Verifies search correctness (term + phrase queries) through multiple merge rounds.
1002    #[tokio::test]
1003    async fn test_multi_round_merge_with_search() {
1004        let mut schema_builder = SchemaBuilder::default();
1005        let title = schema_builder.add_text_field("title", true, true);
1006        let body = schema_builder.add_text_field("body", true, true);
1007        let schema = schema_builder.build();
1008
1009        let dir = RamDirectory::new();
1010        let config = IndexConfig {
1011            max_indexing_memory_bytes: 512,
1012            ..Default::default()
1013        };
1014
1015        // --- Round 1: 5 segments × 10 docs = 50 docs ---
1016        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1017            .await
1018            .unwrap();
1019
1020        for batch in 0..5 {
1021            for i in 0..10 {
1022                let mut doc = Document::new();
1023                doc.add_text(
1024                    title,
1025                    format!("alpha bravo charlie batch{} doc{}", batch, i),
1026                );
1027                doc.add_text(
1028                    body,
1029                    format!("the quick brown fox jumps over the lazy dog number {}", i),
1030                );
1031                writer.add_document(doc).unwrap();
1032            }
1033            writer.flush().await.unwrap();
1034        }
1035        writer.commit().await.unwrap();
1036
1037        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1038        let pre_merge_segments = index.segment_readers().await.unwrap().len();
1039        assert!(
1040            pre_merge_segments >= 3,
1041            "Expected >=3 segments, got {}",
1042            pre_merge_segments
1043        );
1044        assert_eq!(index.num_docs().await.unwrap(), 50);
1045
1046        // Search before merge
1047        let results = index.query("alpha", 100).await.unwrap();
1048        assert_eq!(results.hits.len(), 50, "all 50 docs should match 'alpha'");
1049
1050        let results = index.query("fox", 100).await.unwrap();
1051        assert_eq!(results.hits.len(), 50, "all 50 docs should match 'fox'");
1052
1053        // --- Merge round 1 ---
1054        let writer = IndexWriter::open(dir.clone(), config.clone())
1055            .await
1056            .unwrap();
1057        writer.force_merge().await.unwrap();
1058
1059        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1060        assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1061        assert_eq!(index.num_docs().await.unwrap(), 50);
1062
1063        // Search after first merge
1064        let results = index.query("alpha", 100).await.unwrap();
1065        assert_eq!(
1066            results.hits.len(),
1067            50,
1068            "all 50 docs should match 'alpha' after merge 1"
1069        );
1070
1071        let results = index.query("fox", 100).await.unwrap();
1072        assert_eq!(
1073            results.hits.len(),
1074            50,
1075            "all 50 docs should match 'fox' after merge 1"
1076        );
1077
1078        // Verify all docs retrievable
1079        let reader1 = index.reader().await.unwrap();
1080        let searcher1 = reader1.searcher().await.unwrap();
1081        for i in 0..50 {
1082            let doc = searcher1.doc(i).await.unwrap();
1083            assert!(doc.is_some(), "doc {} should exist after merge 1", i);
1084        }
1085
1086        // --- Round 2: add 30 more docs in 3 segments ---
1087        let writer = IndexWriter::open(dir.clone(), config.clone())
1088            .await
1089            .unwrap();
1090        for batch in 0..3 {
1091            for i in 0..10 {
1092                let mut doc = Document::new();
1093                doc.add_text(
1094                    title,
1095                    format!("delta echo foxtrot round2_batch{} doc{}", batch, i),
1096                );
1097                doc.add_text(
1098                    body,
1099                    format!("the quick brown fox jumps again number {}", i),
1100                );
1101                writer.add_document(doc).unwrap();
1102            }
1103            writer.flush().await.unwrap();
1104        }
1105        writer.commit().await.unwrap();
1106
1107        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1108        assert_eq!(index.num_docs().await.unwrap(), 80);
1109        assert!(
1110            index.segment_readers().await.unwrap().len() >= 2,
1111            "Should have >=2 segments after round 2 ingestion"
1112        );
1113
1114        // Search spans both old merged segment and new segments
1115        let results = index.query("fox", 100).await.unwrap();
1116        assert_eq!(results.hits.len(), 80, "all 80 docs should match 'fox'");
1117
1118        let results = index.query("alpha", 100).await.unwrap();
1119        assert_eq!(results.hits.len(), 50, "only round 1 docs match 'alpha'");
1120
1121        let results = index.query("delta", 100).await.unwrap();
1122        assert_eq!(results.hits.len(), 30, "only round 2 docs match 'delta'");
1123
1124        // --- Merge round 2 ---
1125        let writer = IndexWriter::open(dir.clone(), config.clone())
1126            .await
1127            .unwrap();
1128        writer.force_merge().await.unwrap();
1129
1130        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1131        assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1132        assert_eq!(index.num_docs().await.unwrap(), 80);
1133
1134        // All searches still correct after second merge
1135        let results = index.query("fox", 100).await.unwrap();
1136        assert_eq!(results.hits.len(), 80, "all 80 docs after merge 2");
1137
1138        let results = index.query("alpha", 100).await.unwrap();
1139        assert_eq!(results.hits.len(), 50, "round 1 docs after merge 2");
1140
1141        let results = index.query("delta", 100).await.unwrap();
1142        assert_eq!(results.hits.len(), 30, "round 2 docs after merge 2");
1143
1144        // Verify all 80 docs retrievable
1145        let reader2 = index.reader().await.unwrap();
1146        let searcher2 = reader2.searcher().await.unwrap();
1147        for i in 0..80 {
1148            let doc = searcher2.doc(i).await.unwrap();
1149            assert!(doc.is_some(), "doc {} should exist after merge 2", i);
1150        }
1151    }
1152
1153    /// Large-scale merge: many segments with overlapping terms, verifying
1154    /// BM25 scoring and doc retrieval after merge.
1155    #[tokio::test]
1156    async fn test_large_scale_merge_correctness() {
1157        let mut schema_builder = SchemaBuilder::default();
1158        let title = schema_builder.add_text_field("title", true, true);
1159        let schema = schema_builder.build();
1160
1161        let dir = RamDirectory::new();
1162        let config = IndexConfig {
1163            max_indexing_memory_bytes: 512,
1164            ..Default::default()
1165        };
1166
1167        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1168            .await
1169            .unwrap();
1170
1171        // 8 batches × 25 docs = 200 docs total
1172        // Terms: "common" appears in all, "unique_N" appears in batch N only
1173        let total_docs = 200u32;
1174        for batch in 0..8 {
1175            for i in 0..25 {
1176                let mut doc = Document::new();
1177                doc.add_text(
1178                    title,
1179                    format!("common shared term unique_{} item{}", batch, i),
1180                );
1181                writer.add_document(doc).unwrap();
1182            }
1183            writer.flush().await.unwrap();
1184        }
1185        writer.commit().await.unwrap();
1186
1187        // Verify pre-merge
1188        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1189        assert_eq!(index.num_docs().await.unwrap(), total_docs);
1190
1191        let results = index.query("common", 300).await.unwrap();
1192        assert_eq!(
1193            results.hits.len(),
1194            total_docs as usize,
1195            "all docs should match 'common'"
1196        );
1197
1198        // Each unique_N matches exactly 25 docs
1199        for batch in 0..8 {
1200            let q = format!("unique_{}", batch);
1201            let results = index.query(&q, 100).await.unwrap();
1202            assert_eq!(results.hits.len(), 25, "'{}' should match 25 docs", q);
1203        }
1204
1205        // Force merge
1206        let writer = IndexWriter::open(dir.clone(), config.clone())
1207            .await
1208            .unwrap();
1209        writer.force_merge().await.unwrap();
1210
1211        // Verify post-merge: single segment, same results
1212        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1213        assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1214        assert_eq!(index.num_docs().await.unwrap(), total_docs);
1215
1216        let results = index.query("common", 300).await.unwrap();
1217        assert_eq!(results.hits.len(), total_docs as usize);
1218
1219        for batch in 0..8 {
1220            let q = format!("unique_{}", batch);
1221            let results = index.query(&q, 100).await.unwrap();
1222            assert_eq!(results.hits.len(), 25, "'{}' after merge", q);
1223        }
1224
1225        // Verify doc retrieval for every doc
1226        let reader = index.reader().await.unwrap();
1227        let searcher = reader.searcher().await.unwrap();
1228        for i in 0..total_docs {
1229            let doc = searcher.doc(i).await.unwrap();
1230            assert!(doc.is_some(), "doc {} missing after merge", i);
1231        }
1232    }
1233
1234    /// Test that auto-merge is triggered by the merge policy during commit,
1235    /// without calling force_merge. Uses MmapDirectory and higher parallelism
1236    /// to reproduce production conditions.
1237    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
1238    async fn test_auto_merge_triggered() {
1239        use crate::directories::MmapDirectory;
1240        let tmp_dir = tempfile::tempdir().unwrap();
1241        let dir = MmapDirectory::new(tmp_dir.path());
1242
1243        let mut schema_builder = SchemaBuilder::default();
1244        let title = schema_builder.add_text_field("title", true, true);
1245        let body = schema_builder.add_text_field("body", true, true);
1246        let schema = schema_builder.build();
1247
1248        // Aggressive policy: merge when 3 segments in same tier
1249        let config = IndexConfig {
1250            max_indexing_memory_bytes: 4096,
1251            num_indexing_threads: 4,
1252            merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1253            ..Default::default()
1254        };
1255
1256        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1257            .await
1258            .unwrap();
1259
1260        // Create 12 segments with ~50 docs each (4x the aggressive threshold of 3)
1261        for batch in 0..12 {
1262            for i in 0..50 {
1263                let mut doc = Document::new();
1264                doc.add_text(title, format!("document_{} batch_{} alpha bravo", i, batch));
1265                doc.add_text(
1266                    body,
1267                    format!(
1268                        "the quick brown fox jumps over lazy dog number {} round {}",
1269                        i, batch
1270                    ),
1271                );
1272                writer.add_document(doc).unwrap();
1273            }
1274            writer.flush().await.unwrap();
1275        }
1276        writer.commit().await.unwrap();
1277
1278        let pre_merge = writer.segment_manager.get_segment_ids().await.len();
1279
1280        // Give background merges time to complete
1281        writer.wait_for_merges().await;
1282
1283        // After commit + auto-merge, segment count should be reduced
1284        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1285        let segment_count = index.segment_readers().await.unwrap().len();
1286        eprintln!(
1287            "Segments: {} before merge, {} after auto-merge",
1288            pre_merge, segment_count
1289        );
1290        assert!(
1291            segment_count < pre_merge,
1292            "Expected auto-merge to reduce segments from {}, got {}",
1293            pre_merge,
1294            segment_count
1295        );
1296    }
1297
1298    /// Regression test: commit with dense vector fields + aggressive merge policy.
1299    /// Exercises the race where background merge deletes segment files while
1300    /// maybe_build_vector_index → collect_vectors_for_training tries to open them.
1301    /// Before the fix, this would fail with "IO error: No such file or directory".
1302    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
1303    async fn test_commit_with_vectors_and_background_merge() {
1304        use crate::directories::MmapDirectory;
1305        use crate::dsl::DenseVectorConfig;
1306
1307        let tmp_dir = tempfile::tempdir().unwrap();
1308        let dir = MmapDirectory::new(tmp_dir.path());
1309
1310        let mut schema_builder = SchemaBuilder::default();
1311        let title = schema_builder.add_text_field("title", true, true);
1312        // RaBitQ with very low build_threshold so vector index building triggers during commit
1313        let vec_config = DenseVectorConfig::new(8).with_build_threshold(10);
1314        let embedding =
1315            schema_builder.add_dense_vector_field_with_config("embedding", true, true, vec_config);
1316        let schema = schema_builder.build();
1317
1318        // Aggressive merge: triggers background merges at 3 segments per tier
1319        let config = IndexConfig {
1320            max_indexing_memory_bytes: 4096,
1321            num_indexing_threads: 4,
1322            merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1323            ..Default::default()
1324        };
1325
1326        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1327            .await
1328            .unwrap();
1329
1330        // Create 12 segments with vectors — enough to trigger both
1331        // background merges (aggressive policy) and vector index building (threshold=10)
1332        for batch in 0..12 {
1333            for i in 0..5 {
1334                let mut doc = Document::new();
1335                doc.add_text(title, format!("doc_{}_batch_{}", i, batch));
1336                // 8-dim random vector
1337                let vec: Vec<f32> = (0..8).map(|j| (i * 8 + j + batch) as f32 * 0.1).collect();
1338                doc.add_dense_vector(embedding, vec);
1339                writer.add_document(doc).unwrap();
1340            }
1341            writer.flush().await.unwrap();
1342        }
1343
1344        // This commit triggers:
1345        // 1. register_segment × 12 → maybe_merge → background merges
1346        // 2. maybe_build_vector_index → collect_vectors_for_training
1347        // The race: step 1 deletes segment files while step 2 reads them.
1348        writer.commit().await.unwrap();
1349        writer.wait_for_merges().await;
1350
1351        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1352        let num_docs = index.num_docs().await.unwrap();
1353        assert_eq!(num_docs, 60, "Expected 60 docs, got {}", num_docs);
1354    }
1355
1356    /// Stress test: force_merge with many segments (iterative batching).
1357    /// Verifies that merging 50 segments doesn't OOM or exhaust file descriptors.
1358    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1359    async fn test_force_merge_many_segments() {
1360        use crate::directories::MmapDirectory;
1361        let tmp_dir = tempfile::tempdir().unwrap();
1362        let dir = MmapDirectory::new(tmp_dir.path());
1363
1364        let mut schema_builder = SchemaBuilder::default();
1365        let title = schema_builder.add_text_field("title", true, true);
1366        let schema = schema_builder.build();
1367
1368        let config = IndexConfig {
1369            max_indexing_memory_bytes: 512,
1370            ..Default::default()
1371        };
1372
1373        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1374            .await
1375            .unwrap();
1376
1377        // Create 50 tiny segments
1378        for batch in 0..50 {
1379            for i in 0..3 {
1380                let mut doc = Document::new();
1381                doc.add_text(title, format!("term_{} batch_{}", i, batch));
1382                writer.add_document(doc).unwrap();
1383            }
1384            writer.flush().await.unwrap();
1385        }
1386        writer.commit().await.unwrap();
1387        // Wait for background merges before reading segment count
1388        writer.wait_for_merges().await;
1389
1390        let seg_ids = writer.segment_manager.get_segment_ids().await;
1391        let pre = seg_ids.len();
1392        eprintln!("Segments before force_merge: {}", pre);
1393        assert!(pre >= 2, "Expected multiple segments, got {}", pre);
1394
1395        // Force merge all into one — should iterate in batches, not OOM
1396        writer.force_merge().await.unwrap();
1397
1398        let index2 = Index::open(dir, config).await.unwrap();
1399        let post = index2.segment_readers().await.unwrap().len();
1400        eprintln!("Segments after force_merge: {}", post);
1401        assert_eq!(post, 1);
1402        assert_eq!(index2.num_docs().await.unwrap(), 150);
1403    }
1404}