Skip to main content

hermes_core/index/
mod.rs

1//! Index - multi-segment async search index
2//!
3//! The `Index` is the central concept that provides:
4//! - `Index::create()` / `Index::open()` - create or open an index
5//! - `index.writer()` - get an IndexWriter for adding documents
6//! - `index.reader()` - get an IndexReader for searching (with reload policy)
7//!
8//! The Index owns the SegmentManager which handles segment lifecycle and tracking.
9
10#[cfg(feature = "native")]
11use crate::dsl::Schema;
12#[cfg(feature = "native")]
13use crate::error::Result;
14#[cfg(feature = "native")]
15use std::sync::Arc;
16
17mod searcher;
18pub use searcher::Searcher;
19
20#[cfg(feature = "native")]
21mod reader;
22#[cfg(feature = "native")]
23mod vector_builder;
24#[cfg(feature = "native")]
25mod writer;
26#[cfg(feature = "native")]
27pub use reader::IndexReader;
28#[cfg(feature = "native")]
29pub use writer::{IndexWriter, PreparedCommit};
30
31mod metadata;
32pub use metadata::{FieldVectorMeta, INDEX_META_FILENAME, IndexMetadata, VectorIndexState};
33
34#[cfg(feature = "native")]
35mod helpers;
36#[cfg(feature = "native")]
37pub use helpers::{
38    IndexingStats, SchemaConfig, SchemaFieldConfig, create_index_at_path, create_index_from_sdl,
39    index_documents_from_reader, index_json_document, parse_schema,
40};
41
42/// Default file name for the slice cache
43pub const SLICE_CACHE_FILENAME: &str = "index.slicecache";
44
45/// Index configuration
46#[derive(Debug, Clone)]
47pub struct IndexConfig {
48    /// Number of threads for CPU-intensive tasks (search parallelism)
49    pub num_threads: usize,
50    /// Number of parallel segment builders (documents distributed round-robin)
51    pub num_indexing_threads: usize,
52    /// Number of threads for parallel block compression within each segment
53    pub num_compression_threads: usize,
54    /// Block cache size for term dictionary per segment
55    pub term_cache_blocks: usize,
56    /// Block cache size for document store per segment
57    pub store_cache_blocks: usize,
58    /// Max memory (bytes) across all builders before auto-commit (global limit)
59    pub max_indexing_memory_bytes: usize,
60    /// Merge policy for background segment merging
61    pub merge_policy: Box<dyn crate::merge::MergePolicy>,
62    /// Index optimization mode (adaptive, size-optimized, performance-optimized)
63    pub optimization: crate::structures::IndexOptimization,
64    /// Reload interval in milliseconds for IndexReader (how often to check for new segments)
65    pub reload_interval_ms: u64,
66}
67
68impl Default for IndexConfig {
69    fn default() -> Self {
70        #[cfg(feature = "native")]
71        let indexing_threads = crate::default_indexing_threads();
72        #[cfg(not(feature = "native"))]
73        let indexing_threads = 1;
74
75        #[cfg(feature = "native")]
76        let compression_threads = crate::default_compression_threads();
77        #[cfg(not(feature = "native"))]
78        let compression_threads = 1;
79
80        Self {
81            num_threads: indexing_threads,
82            num_indexing_threads: 1,
83            num_compression_threads: compression_threads,
84            term_cache_blocks: 256,
85            store_cache_blocks: 32,
86            max_indexing_memory_bytes: 256 * 1024 * 1024, // 256 MB default
87            merge_policy: Box::new(crate::merge::TieredMergePolicy::default()),
88            optimization: crate::structures::IndexOptimization::default(),
89            reload_interval_ms: 1000, // 1 second default
90        }
91    }
92}
93
94/// Multi-segment async Index
95///
96/// The central concept for search. Owns segment lifecycle and provides:
97/// - `Index::create()` / `Index::open()` - create or open an index
98/// - `index.writer()` - get an IndexWriter for adding documents
99/// - `index.reader()` - get an IndexReader for searching with reload policy
100///
101/// All segment management is delegated to SegmentManager.
102#[cfg(feature = "native")]
103pub struct Index<D: crate::directories::DirectoryWriter + 'static> {
104    directory: Arc<D>,
105    schema: Arc<Schema>,
106    config: IndexConfig,
107    /// Segment manager - owns segments, tracker, metadata, and trained structures
108    segment_manager: Arc<crate::merge::SegmentManager<D>>,
109    /// Cached reader (created lazily, reused across calls)
110    cached_reader: tokio::sync::OnceCell<IndexReader<D>>,
111}
112
113#[cfg(feature = "native")]
114impl<D: crate::directories::DirectoryWriter + 'static> Index<D> {
115    /// Create a new index in the directory
116    pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
117        let directory = Arc::new(directory);
118        let schema = Arc::new(schema);
119        let metadata = IndexMetadata::new((*schema).clone());
120
121        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
122            Arc::clone(&directory),
123            Arc::clone(&schema),
124            metadata,
125            config.merge_policy.clone_box(),
126            config.term_cache_blocks,
127        ));
128
129        // Save initial metadata
130        segment_manager.update_metadata(|_| {}).await?;
131
132        Ok(Self {
133            directory,
134            schema,
135            config,
136            segment_manager,
137            cached_reader: tokio::sync::OnceCell::new(),
138        })
139    }
140
141    /// Open an existing index from a directory
142    pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
143        let directory = Arc::new(directory);
144
145        // Load metadata (includes schema)
146        let metadata = IndexMetadata::load(directory.as_ref()).await?;
147        let schema = Arc::new(metadata.schema.clone());
148
149        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
150            Arc::clone(&directory),
151            Arc::clone(&schema),
152            metadata,
153            config.merge_policy.clone_box(),
154            config.term_cache_blocks,
155        ));
156
157        // Load trained structures into SegmentManager's ArcSwap
158        segment_manager.load_and_publish_trained().await;
159
160        Ok(Self {
161            directory,
162            schema,
163            config,
164            segment_manager,
165            cached_reader: tokio::sync::OnceCell::new(),
166        })
167    }
168
169    /// Get the schema
170    pub fn schema(&self) -> &Schema {
171        &self.schema
172    }
173
174    /// Get a reference to the underlying directory
175    pub fn directory(&self) -> &D {
176        &self.directory
177    }
178
179    /// Get the segment manager
180    pub fn segment_manager(&self) -> &Arc<crate::merge::SegmentManager<D>> {
181        &self.segment_manager
182    }
183
184    /// Get an IndexReader for searching (with reload policy)
185    ///
186    /// The reader is cached and reused across calls. The reader's internal
187    /// searcher will reload segments based on its reload interval (configurable via IndexConfig).
188    pub async fn reader(&self) -> Result<&IndexReader<D>> {
189        self.cached_reader
190            .get_or_try_init(|| async {
191                IndexReader::from_segment_manager(
192                    Arc::clone(&self.schema),
193                    Arc::clone(&self.segment_manager),
194                    self.config.term_cache_blocks,
195                    self.config.reload_interval_ms,
196                )
197                .await
198            })
199            .await
200    }
201
202    /// Get the config
203    pub fn config(&self) -> &IndexConfig {
204        &self.config
205    }
206
207    /// Get segment readers for query execution (convenience method)
208    pub async fn segment_readers(&self) -> Result<Vec<Arc<crate::segment::SegmentReader>>> {
209        let reader = self.reader().await?;
210        let searcher = reader.searcher().await?;
211        Ok(searcher.segment_readers().to_vec())
212    }
213
214    /// Total number of documents across all segments
215    pub async fn num_docs(&self) -> Result<u32> {
216        let reader = self.reader().await?;
217        let searcher = reader.searcher().await?;
218        Ok(searcher.num_docs())
219    }
220
221    /// Get default fields for search
222    pub fn default_fields(&self) -> Vec<crate::Field> {
223        if !self.schema.default_fields().is_empty() {
224            self.schema.default_fields().to_vec()
225        } else {
226            self.schema
227                .fields()
228                .filter(|(_, entry)| {
229                    entry.indexed && entry.field_type == crate::dsl::FieldType::Text
230                })
231                .map(|(field, _)| field)
232                .collect()
233        }
234    }
235
236    /// Get tokenizer registry
237    pub fn tokenizers(&self) -> Arc<crate::tokenizer::TokenizerRegistry> {
238        Arc::new(crate::tokenizer::TokenizerRegistry::default())
239    }
240
241    /// Create a query parser for this index
242    pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
243        let default_fields = self.default_fields();
244        let tokenizers = self.tokenizers();
245
246        let query_routers = self.schema.query_routers();
247        if !query_routers.is_empty()
248            && let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers)
249        {
250            return crate::dsl::QueryLanguageParser::with_router(
251                Arc::clone(&self.schema),
252                default_fields,
253                tokenizers,
254                router,
255            );
256        }
257
258        crate::dsl::QueryLanguageParser::new(Arc::clone(&self.schema), default_fields, tokenizers)
259    }
260
261    /// Parse and search using a query string
262    pub async fn query(
263        &self,
264        query_str: &str,
265        limit: usize,
266    ) -> Result<crate::query::SearchResponse> {
267        self.query_offset(query_str, limit, 0).await
268    }
269
270    /// Query with offset for pagination
271    pub async fn query_offset(
272        &self,
273        query_str: &str,
274        limit: usize,
275        offset: usize,
276    ) -> Result<crate::query::SearchResponse> {
277        let parser = self.query_parser();
278        let query = parser
279            .parse(query_str)
280            .map_err(crate::error::Error::Query)?;
281        self.search_offset(query.as_ref(), limit, offset).await
282    }
283
284    /// Search and return results
285    pub async fn search(
286        &self,
287        query: &dyn crate::query::Query,
288        limit: usize,
289    ) -> Result<crate::query::SearchResponse> {
290        self.search_offset(query, limit, 0).await
291    }
292
293    /// Search with offset for pagination
294    pub async fn search_offset(
295        &self,
296        query: &dyn crate::query::Query,
297        limit: usize,
298        offset: usize,
299    ) -> Result<crate::query::SearchResponse> {
300        let reader = self.reader().await?;
301        let searcher = reader.searcher().await?;
302        let segments = searcher.segment_readers();
303
304        let fetch_limit = offset + limit;
305
306        let futures: Vec<_> = segments
307            .iter()
308            .map(|segment| {
309                let sid = segment.meta().id;
310                async move {
311                    let results =
312                        crate::query::search_segment(segment.as_ref(), query, fetch_limit).await?;
313                    Ok::<_, crate::error::Error>(
314                        results
315                            .into_iter()
316                            .map(move |r| (sid, r))
317                            .collect::<Vec<_>>(),
318                    )
319                }
320            })
321            .collect();
322
323        let batches = futures::future::try_join_all(futures).await?;
324        let mut all_results: Vec<(u128, crate::query::SearchResult)> =
325            Vec::with_capacity(batches.iter().map(|b| b.len()).sum());
326        for batch in batches {
327            all_results.extend(batch);
328        }
329
330        all_results.sort_by(|a, b| {
331            b.1.score
332                .partial_cmp(&a.1.score)
333                .unwrap_or(std::cmp::Ordering::Equal)
334        });
335
336        let total_hits = all_results.len() as u32;
337
338        let hits: Vec<crate::query::SearchHit> = all_results
339            .into_iter()
340            .skip(offset)
341            .take(limit)
342            .map(|(segment_id, result)| crate::query::SearchHit {
343                address: crate::query::DocAddress::new(segment_id, result.doc_id),
344                score: result.score,
345                matched_fields: result.extract_ordinals(),
346            })
347            .collect();
348
349        Ok(crate::query::SearchResponse { hits, total_hits })
350    }
351
352    /// Get a document by its unique address
353    pub async fn get_document(
354        &self,
355        address: &crate::query::DocAddress,
356    ) -> Result<Option<crate::dsl::Document>> {
357        let reader = self.reader().await?;
358        let searcher = reader.searcher().await?;
359        searcher.get_document(address).await
360    }
361
362    /// Get posting lists for a term across all segments
363    pub async fn get_postings(
364        &self,
365        field: crate::Field,
366        term: &[u8],
367    ) -> Result<
368        Vec<(
369            Arc<crate::segment::SegmentReader>,
370            crate::structures::BlockPostingList,
371        )>,
372    > {
373        let segments = self.segment_readers().await?;
374        let mut results = Vec::new();
375
376        for segment in segments {
377            if let Some(postings) = segment.get_postings(field, term).await? {
378                results.push((segment, postings));
379            }
380        }
381
382        Ok(results)
383    }
384}
385
386/// Native-only methods for Index
387#[cfg(feature = "native")]
388impl<D: crate::directories::DirectoryWriter + 'static> Index<D> {
389    /// Get an IndexWriter for adding documents
390    pub fn writer(&self) -> writer::IndexWriter<D> {
391        writer::IndexWriter::from_index(self)
392    }
393}
394
395#[cfg(test)]
396mod tests {
397    use super::*;
398    use crate::directories::RamDirectory;
399    use crate::dsl::{Document, SchemaBuilder};
400
401    #[tokio::test]
402    async fn test_index_create_and_search() {
403        let mut schema_builder = SchemaBuilder::default();
404        let title = schema_builder.add_text_field("title", true, true);
405        let body = schema_builder.add_text_field("body", true, true);
406        let schema = schema_builder.build();
407
408        let dir = RamDirectory::new();
409        let config = IndexConfig::default();
410
411        // Create index and add documents
412        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
413            .await
414            .unwrap();
415
416        let mut doc1 = Document::new();
417        doc1.add_text(title, "Hello World");
418        doc1.add_text(body, "This is the first document");
419        writer.add_document(doc1).unwrap();
420
421        let mut doc2 = Document::new();
422        doc2.add_text(title, "Goodbye World");
423        doc2.add_text(body, "This is the second document");
424        writer.add_document(doc2).unwrap();
425
426        writer.commit().await.unwrap();
427
428        // Open for reading
429        let index = Index::open(dir, config).await.unwrap();
430        assert_eq!(index.num_docs().await.unwrap(), 2);
431
432        // Check postings
433        let postings = index.get_postings(title, b"world").await.unwrap();
434        assert_eq!(postings.len(), 1); // One segment
435        assert_eq!(postings[0].1.doc_count(), 2); // Two docs with "world"
436
437        // Retrieve document via searcher snapshot
438        let reader = index.reader().await.unwrap();
439        let searcher = reader.searcher().await.unwrap();
440        let doc = searcher.doc(0).await.unwrap().unwrap();
441        assert_eq!(doc.get_first(title).unwrap().as_text(), Some("Hello World"));
442    }
443
444    #[tokio::test]
445    async fn test_multiple_segments() {
446        let mut schema_builder = SchemaBuilder::default();
447        let title = schema_builder.add_text_field("title", true, true);
448        let schema = schema_builder.build();
449
450        let dir = RamDirectory::new();
451        let config = IndexConfig {
452            max_indexing_memory_bytes: 1024, // Very small to trigger frequent flushes
453            ..Default::default()
454        };
455
456        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
457            .await
458            .unwrap();
459
460        // Add documents in batches to create multiple segments
461        for batch in 0..3 {
462            for i in 0..5 {
463                let mut doc = Document::new();
464                doc.add_text(title, format!("Document {} batch {}", i, batch));
465                writer.add_document(doc).unwrap();
466            }
467            writer.commit().await.unwrap();
468        }
469
470        // Open and check
471        let index = Index::open(dir, config).await.unwrap();
472        assert_eq!(index.num_docs().await.unwrap(), 15);
473        // With queue-based indexing, exact segment count varies
474        assert!(
475            index.segment_readers().await.unwrap().len() >= 2,
476            "Expected multiple segments"
477        );
478    }
479
480    #[tokio::test]
481    async fn test_segment_merge() {
482        let mut schema_builder = SchemaBuilder::default();
483        let title = schema_builder.add_text_field("title", true, true);
484        let schema = schema_builder.build();
485
486        let dir = RamDirectory::new();
487        let config = IndexConfig {
488            max_indexing_memory_bytes: 512, // Very small to trigger frequent flushes
489            ..Default::default()
490        };
491
492        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
493            .await
494            .unwrap();
495
496        // Create multiple segments by flushing between batches
497        for batch in 0..3 {
498            for i in 0..3 {
499                let mut doc = Document::new();
500                doc.add_text(title, format!("Document {} batch {}", i, batch));
501                writer.add_document(doc).unwrap();
502            }
503            writer.commit().await.unwrap();
504        }
505
506        // Should have multiple segments (at least 2, one per flush with docs)
507        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
508        assert!(
509            index.segment_readers().await.unwrap().len() >= 2,
510            "Expected multiple segments"
511        );
512
513        // Force merge
514        let mut writer = IndexWriter::open(dir.clone(), config.clone())
515            .await
516            .unwrap();
517        writer.force_merge().await.unwrap();
518
519        // Should have 1 segment now
520        let index = Index::open(dir, config).await.unwrap();
521        assert_eq!(index.segment_readers().await.unwrap().len(), 1);
522        assert_eq!(index.num_docs().await.unwrap(), 9);
523
524        // Verify all documents accessible (order may vary with queue-based indexing)
525        let reader = index.reader().await.unwrap();
526        let searcher = reader.searcher().await.unwrap();
527        let mut found_docs = 0;
528        for i in 0..9 {
529            if searcher.doc(i).await.unwrap().is_some() {
530                found_docs += 1;
531            }
532        }
533        assert_eq!(found_docs, 9);
534    }
535
536    #[tokio::test]
537    async fn test_match_query() {
538        let mut schema_builder = SchemaBuilder::default();
539        let title = schema_builder.add_text_field("title", true, true);
540        let body = schema_builder.add_text_field("body", true, true);
541        let schema = schema_builder.build();
542
543        let dir = RamDirectory::new();
544        let config = IndexConfig::default();
545
546        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
547            .await
548            .unwrap();
549
550        let mut doc1 = Document::new();
551        doc1.add_text(title, "rust programming");
552        doc1.add_text(body, "Learn rust language");
553        writer.add_document(doc1).unwrap();
554
555        let mut doc2 = Document::new();
556        doc2.add_text(title, "python programming");
557        doc2.add_text(body, "Learn python language");
558        writer.add_document(doc2).unwrap();
559
560        writer.commit().await.unwrap();
561
562        let index = Index::open(dir, config).await.unwrap();
563
564        // Test match query with multiple default fields
565        let results = index.query("rust", 10).await.unwrap();
566        assert_eq!(results.hits.len(), 1);
567
568        // Test match query with multiple tokens
569        let results = index.query("rust programming", 10).await.unwrap();
570        assert!(!results.hits.is_empty());
571
572        // Verify hit has address (segment_id + doc_id)
573        let hit = &results.hits[0];
574        assert!(!hit.address.segment_id.is_empty(), "Should have segment_id");
575
576        // Verify document retrieval by address
577        let doc = index.get_document(&hit.address).await.unwrap().unwrap();
578        assert!(
579            !doc.field_values().is_empty(),
580            "Doc should have field values"
581        );
582
583        // Also verify doc retrieval via searcher snapshot
584        let reader = index.reader().await.unwrap();
585        let searcher = reader.searcher().await.unwrap();
586        let doc = searcher.doc(0).await.unwrap().unwrap();
587        assert!(
588            !doc.field_values().is_empty(),
589            "Doc should have field values"
590        );
591    }
592
593    #[tokio::test]
594    async fn test_slice_cache_warmup_and_load() {
595        use crate::directories::SliceCachingDirectory;
596
597        let mut schema_builder = SchemaBuilder::default();
598        let title = schema_builder.add_text_field("title", true, true);
599        let body = schema_builder.add_text_field("body", true, true);
600        let schema = schema_builder.build();
601
602        let dir = RamDirectory::new();
603        let config = IndexConfig::default();
604
605        // Create index with some documents
606        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
607            .await
608            .unwrap();
609
610        for i in 0..10 {
611            let mut doc = Document::new();
612            doc.add_text(title, format!("Document {} about rust", i));
613            doc.add_text(body, format!("This is body text number {}", i));
614            writer.add_document(doc).unwrap();
615        }
616        writer.commit().await.unwrap();
617
618        // Open with slice caching and perform some operations to warm up cache
619        let caching_dir = SliceCachingDirectory::new(dir.clone(), 1024 * 1024);
620        let index = Index::open(caching_dir, config.clone()).await.unwrap();
621
622        // Perform a search to warm up the cache
623        let results = index.query("rust", 10).await.unwrap();
624        assert!(!results.hits.is_empty());
625
626        // Check cache stats - should have cached some data
627        let stats = index.directory.stats();
628        assert!(stats.total_bytes > 0, "Cache should have data after search");
629    }
630
631    #[tokio::test]
632    async fn test_multivalue_field_indexing_and_search() {
633        let mut schema_builder = SchemaBuilder::default();
634        let uris = schema_builder.add_text_field("uris", true, true);
635        let title = schema_builder.add_text_field("title", true, true);
636        let schema = schema_builder.build();
637
638        let dir = RamDirectory::new();
639        let config = IndexConfig::default();
640
641        // Create index and add document with multi-value field
642        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
643            .await
644            .unwrap();
645
646        let mut doc = Document::new();
647        doc.add_text(uris, "one");
648        doc.add_text(uris, "two");
649        doc.add_text(title, "Test Document");
650        writer.add_document(doc).unwrap();
651
652        // Add another document with different uris
653        let mut doc2 = Document::new();
654        doc2.add_text(uris, "three");
655        doc2.add_text(title, "Another Document");
656        writer.add_document(doc2).unwrap();
657
658        writer.commit().await.unwrap();
659
660        // Open for reading
661        let index = Index::open(dir, config).await.unwrap();
662        assert_eq!(index.num_docs().await.unwrap(), 2);
663
664        // Verify document retrieval preserves all values
665        let reader = index.reader().await.unwrap();
666        let searcher = reader.searcher().await.unwrap();
667        let doc = searcher.doc(0).await.unwrap().unwrap();
668        let all_uris: Vec<_> = doc.get_all(uris).collect();
669        assert_eq!(all_uris.len(), 2, "Should have 2 uris values");
670        assert_eq!(all_uris[0].as_text(), Some("one"));
671        assert_eq!(all_uris[1].as_text(), Some("two"));
672
673        // Verify to_json returns array for multi-value field
674        let json = doc.to_json(index.schema());
675        let uris_json = json.get("uris").unwrap();
676        assert!(uris_json.is_array(), "Multi-value field should be an array");
677        let uris_arr = uris_json.as_array().unwrap();
678        assert_eq!(uris_arr.len(), 2);
679        assert_eq!(uris_arr[0].as_str(), Some("one"));
680        assert_eq!(uris_arr[1].as_str(), Some("two"));
681
682        // Verify both values are searchable
683        let results = index.query("uris:one", 10).await.unwrap();
684        assert_eq!(results.hits.len(), 1, "Should find doc with 'one'");
685        assert_eq!(results.hits[0].address.doc_id, 0);
686
687        let results = index.query("uris:two", 10).await.unwrap();
688        assert_eq!(results.hits.len(), 1, "Should find doc with 'two'");
689        assert_eq!(results.hits[0].address.doc_id, 0);
690
691        let results = index.query("uris:three", 10).await.unwrap();
692        assert_eq!(results.hits.len(), 1, "Should find doc with 'three'");
693        assert_eq!(results.hits[0].address.doc_id, 1);
694
695        // Verify searching for non-existent value returns no results
696        let results = index.query("uris:nonexistent", 10).await.unwrap();
697        assert_eq!(results.hits.len(), 0, "Should not find non-existent value");
698    }
699
700    /// Comprehensive test for WAND optimization in BooleanQuery OR queries
701    ///
702    /// This test verifies that:
703    /// 1. BooleanQuery with multiple SHOULD term queries uses WAND automatically
704    /// 2. Search results are correct regardless of WAND optimization
705    /// 3. Scores are reasonable for matching documents
706    #[tokio::test]
707    async fn test_wand_optimization_for_or_queries() {
708        use crate::query::{BooleanQuery, TermQuery};
709
710        let mut schema_builder = SchemaBuilder::default();
711        let content = schema_builder.add_text_field("content", true, true);
712        let schema = schema_builder.build();
713
714        let dir = RamDirectory::new();
715        let config = IndexConfig::default();
716
717        // Create index with documents containing various terms
718        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
719            .await
720            .unwrap();
721
722        // Doc 0: contains "rust" and "programming"
723        let mut doc = Document::new();
724        doc.add_text(content, "rust programming language is fast");
725        writer.add_document(doc).unwrap();
726
727        // Doc 1: contains "rust" only
728        let mut doc = Document::new();
729        doc.add_text(content, "rust is a systems language");
730        writer.add_document(doc).unwrap();
731
732        // Doc 2: contains "programming" only
733        let mut doc = Document::new();
734        doc.add_text(content, "programming is fun");
735        writer.add_document(doc).unwrap();
736
737        // Doc 3: contains "python" (neither rust nor programming)
738        let mut doc = Document::new();
739        doc.add_text(content, "python is easy to learn");
740        writer.add_document(doc).unwrap();
741
742        // Doc 4: contains both "rust" and "programming" multiple times
743        let mut doc = Document::new();
744        doc.add_text(content, "rust rust programming programming systems");
745        writer.add_document(doc).unwrap();
746
747        writer.commit().await.unwrap();
748
749        // Open for reading
750        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
751
752        // Test 1: Pure OR query with multiple terms (should use WAND automatically)
753        let or_query = BooleanQuery::new()
754            .should(TermQuery::text(content, "rust"))
755            .should(TermQuery::text(content, "programming"));
756
757        let results = index.search(&or_query, 10).await.unwrap();
758
759        // Should find docs 0, 1, 2, 4 (all that contain "rust" OR "programming")
760        assert_eq!(results.hits.len(), 4, "Should find exactly 4 documents");
761
762        let doc_ids: Vec<u32> = results.hits.iter().map(|h| h.address.doc_id).collect();
763        assert!(doc_ids.contains(&0), "Should find doc 0");
764        assert!(doc_ids.contains(&1), "Should find doc 1");
765        assert!(doc_ids.contains(&2), "Should find doc 2");
766        assert!(doc_ids.contains(&4), "Should find doc 4");
767        assert!(
768            !doc_ids.contains(&3),
769            "Should NOT find doc 3 (only has 'python')"
770        );
771
772        // Test 2: Single term query (should NOT use WAND, but still work)
773        let single_query = BooleanQuery::new().should(TermQuery::text(content, "rust"));
774
775        let results = index.search(&single_query, 10).await.unwrap();
776        assert_eq!(results.hits.len(), 3, "Should find 3 documents with 'rust'");
777
778        // Test 3: Query with MUST (should NOT use WAND)
779        let must_query = BooleanQuery::new()
780            .must(TermQuery::text(content, "rust"))
781            .should(TermQuery::text(content, "programming"));
782
783        let results = index.search(&must_query, 10).await.unwrap();
784        // Must have "rust", optionally "programming"
785        assert_eq!(results.hits.len(), 3, "Should find 3 documents with 'rust'");
786
787        // Test 4: Query with MUST_NOT (should NOT use WAND)
788        let must_not_query = BooleanQuery::new()
789            .should(TermQuery::text(content, "rust"))
790            .should(TermQuery::text(content, "programming"))
791            .must_not(TermQuery::text(content, "systems"));
792
793        let results = index.search(&must_not_query, 10).await.unwrap();
794        // Should exclude docs with "systems" (doc 1 and 4)
795        let doc_ids: Vec<u32> = results.hits.iter().map(|h| h.address.doc_id).collect();
796        assert!(
797            !doc_ids.contains(&1),
798            "Should NOT find doc 1 (has 'systems')"
799        );
800        assert!(
801            !doc_ids.contains(&4),
802            "Should NOT find doc 4 (has 'systems')"
803        );
804
805        // Test 5: Verify top-k limit works correctly with WAND
806        let or_query = BooleanQuery::new()
807            .should(TermQuery::text(content, "rust"))
808            .should(TermQuery::text(content, "programming"));
809
810        let results = index.search(&or_query, 2).await.unwrap();
811        assert_eq!(results.hits.len(), 2, "Should return only top 2 results");
812
813        // Top results should be docs that match both terms (higher scores)
814        // Doc 0 and 4 contain both "rust" and "programming"
815    }
816
817    /// Test that WAND optimization produces same results as non-WAND for correctness
818    #[tokio::test]
819    async fn test_wand_results_match_standard_boolean() {
820        use crate::query::{BooleanQuery, TermQuery, WandOrQuery};
821
822        let mut schema_builder = SchemaBuilder::default();
823        let content = schema_builder.add_text_field("content", true, true);
824        let schema = schema_builder.build();
825
826        let dir = RamDirectory::new();
827        let config = IndexConfig::default();
828
829        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
830            .await
831            .unwrap();
832
833        // Add several documents
834        for i in 0..10 {
835            let mut doc = Document::new();
836            let text = match i % 4 {
837                0 => "apple banana cherry",
838                1 => "apple orange",
839                2 => "banana grape",
840                _ => "cherry date",
841            };
842            doc.add_text(content, text);
843            writer.add_document(doc).unwrap();
844        }
845
846        writer.commit().await.unwrap();
847        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
848
849        // Compare explicit WandOrQuery with auto-optimized BooleanQuery
850        let wand_query = WandOrQuery::new(content).term("apple").term("banana");
851
852        let bool_query = BooleanQuery::new()
853            .should(TermQuery::text(content, "apple"))
854            .should(TermQuery::text(content, "banana"));
855
856        let wand_results = index.search(&wand_query, 10).await.unwrap();
857        let bool_results = index.search(&bool_query, 10).await.unwrap();
858
859        // Both should find the same documents
860        assert_eq!(
861            wand_results.hits.len(),
862            bool_results.hits.len(),
863            "WAND and Boolean should find same number of docs"
864        );
865
866        let wand_docs: std::collections::HashSet<u32> =
867            wand_results.hits.iter().map(|h| h.address.doc_id).collect();
868        let bool_docs: std::collections::HashSet<u32> =
869            bool_results.hits.iter().map(|h| h.address.doc_id).collect();
870
871        assert_eq!(
872            wand_docs, bool_docs,
873            "WAND and Boolean should find same documents"
874        );
875    }
876
877    #[tokio::test]
878    async fn test_vector_index_threshold_switch() {
879        use crate::dsl::{DenseVectorConfig, DenseVectorQuantization, VectorIndexType};
880
881        // Create schema with dense vector field configured for IVF-RaBitQ
882        let mut schema_builder = SchemaBuilder::default();
883        let title = schema_builder.add_text_field("title", true, true);
884        let embedding = schema_builder.add_dense_vector_field_with_config(
885            "embedding",
886            true, // indexed
887            true, // stored
888            DenseVectorConfig {
889                dim: 8,
890                index_type: VectorIndexType::IvfRaBitQ,
891                quantization: DenseVectorQuantization::F32,
892                num_clusters: Some(4), // Small for test
893                nprobe: 2,
894                build_threshold: Some(50), // Build when we have 50+ vectors
895            },
896        );
897        let schema = schema_builder.build();
898
899        let dir = RamDirectory::new();
900        let config = IndexConfig::default();
901
902        // Phase 1: Add vectors below threshold (should use Flat index)
903        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
904            .await
905            .unwrap();
906
907        // Add 30 documents (below threshold of 50)
908        for i in 0..30 {
909            let mut doc = Document::new();
910            doc.add_text(title, format!("Document {}", i));
911            // Simple embedding: [i, i, i, i, i, i, i, i] normalized
912            let vec: Vec<f32> = (0..8).map(|_| (i as f32) / 30.0).collect();
913            doc.add_dense_vector(embedding, vec);
914            writer.add_document(doc).unwrap();
915        }
916        writer.commit().await.unwrap();
917
918        // Open index and verify it's using Flat (not built yet)
919        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
920        assert!(
921            index.segment_manager.trained().is_none(),
922            "Should not have trained centroids below threshold"
923        );
924
925        // Search should work with Flat index
926        let query_vec: Vec<f32> = vec![0.5; 8];
927        let segments = index.segment_readers().await.unwrap();
928        assert!(!segments.is_empty());
929
930        let results = segments[0]
931            .search_dense_vector(
932                embedding,
933                &query_vec,
934                5,
935                0,
936                1,
937                crate::query::MultiValueCombiner::Max,
938            )
939            .await
940            .unwrap();
941        assert!(!results.is_empty(), "Flat search should return results");
942
943        // Phase 2: Add more vectors to cross threshold
944        let mut writer = IndexWriter::open(dir.clone(), config.clone())
945            .await
946            .unwrap();
947
948        // Add 30 more documents (total 60, above threshold of 50)
949        for i in 30..60 {
950            let mut doc = Document::new();
951            doc.add_text(title, format!("Document {}", i));
952            let vec: Vec<f32> = (0..8).map(|_| (i as f32) / 60.0).collect();
953            doc.add_dense_vector(embedding, vec);
954            writer.add_document(doc).unwrap();
955        }
956        writer.commit().await.unwrap();
957
958        // Manually trigger vector index build (no longer auto-triggered by commit)
959        writer.build_vector_index().await.unwrap();
960
961        // Reopen index and verify trained structures are loaded
962        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
963        assert!(
964            index.segment_manager.trained().is_some(),
965            "Should have loaded trained centroids for embedding field"
966        );
967
968        // Search should still work
969        let segments = index.segment_readers().await.unwrap();
970        let results = segments[0]
971            .search_dense_vector(
972                embedding,
973                &query_vec,
974                5,
975                0,
976                1,
977                crate::query::MultiValueCombiner::Max,
978            )
979            .await
980            .unwrap();
981        assert!(
982            !results.is_empty(),
983            "Search should return results after build"
984        );
985
986        // Phase 3: Verify calling build_vector_index again is a no-op
987        let writer = IndexWriter::open(dir.clone(), config.clone())
988            .await
989            .unwrap();
990        writer.build_vector_index().await.unwrap(); // Should skip training
991
992        // Still built (trained structures present in ArcSwap)
993        assert!(writer.segment_manager.trained().is_some());
994    }
995
996    /// Multi-round merge: flush many small segments, merge, add more, merge again.
997    /// Verifies search correctness (term + phrase queries) through multiple merge rounds.
998    #[tokio::test]
999    async fn test_multi_round_merge_with_search() {
1000        let mut schema_builder = SchemaBuilder::default();
1001        let title = schema_builder.add_text_field("title", true, true);
1002        let body = schema_builder.add_text_field("body", true, true);
1003        let schema = schema_builder.build();
1004
1005        let dir = RamDirectory::new();
1006        let config = IndexConfig {
1007            max_indexing_memory_bytes: 512,
1008            ..Default::default()
1009        };
1010
1011        // --- Round 1: 5 segments × 10 docs = 50 docs ---
1012        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1013            .await
1014            .unwrap();
1015
1016        for batch in 0..5 {
1017            for i in 0..10 {
1018                let mut doc = Document::new();
1019                doc.add_text(
1020                    title,
1021                    format!("alpha bravo charlie batch{} doc{}", batch, i),
1022                );
1023                doc.add_text(
1024                    body,
1025                    format!("the quick brown fox jumps over the lazy dog number {}", i),
1026                );
1027                writer.add_document(doc).unwrap();
1028            }
1029            writer.commit().await.unwrap();
1030        }
1031
1032        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1033        let pre_merge_segments = index.segment_readers().await.unwrap().len();
1034        assert!(
1035            pre_merge_segments >= 3,
1036            "Expected >=3 segments, got {}",
1037            pre_merge_segments
1038        );
1039        assert_eq!(index.num_docs().await.unwrap(), 50);
1040
1041        // Search before merge
1042        let results = index.query("alpha", 100).await.unwrap();
1043        assert_eq!(results.hits.len(), 50, "all 50 docs should match 'alpha'");
1044
1045        let results = index.query("fox", 100).await.unwrap();
1046        assert_eq!(results.hits.len(), 50, "all 50 docs should match 'fox'");
1047
1048        // --- Merge round 1 ---
1049        let mut writer = IndexWriter::open(dir.clone(), config.clone())
1050            .await
1051            .unwrap();
1052        writer.force_merge().await.unwrap();
1053
1054        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1055        assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1056        assert_eq!(index.num_docs().await.unwrap(), 50);
1057
1058        // Search after first merge
1059        let results = index.query("alpha", 100).await.unwrap();
1060        assert_eq!(
1061            results.hits.len(),
1062            50,
1063            "all 50 docs should match 'alpha' after merge 1"
1064        );
1065
1066        let results = index.query("fox", 100).await.unwrap();
1067        assert_eq!(
1068            results.hits.len(),
1069            50,
1070            "all 50 docs should match 'fox' after merge 1"
1071        );
1072
1073        // Verify all docs retrievable
1074        let reader1 = index.reader().await.unwrap();
1075        let searcher1 = reader1.searcher().await.unwrap();
1076        for i in 0..50 {
1077            let doc = searcher1.doc(i).await.unwrap();
1078            assert!(doc.is_some(), "doc {} should exist after merge 1", i);
1079        }
1080
1081        // --- Round 2: add 30 more docs in 3 segments ---
1082        let mut writer = IndexWriter::open(dir.clone(), config.clone())
1083            .await
1084            .unwrap();
1085        for batch in 0..3 {
1086            for i in 0..10 {
1087                let mut doc = Document::new();
1088                doc.add_text(
1089                    title,
1090                    format!("delta echo foxtrot round2_batch{} doc{}", batch, i),
1091                );
1092                doc.add_text(
1093                    body,
1094                    format!("the quick brown fox jumps again number {}", i),
1095                );
1096                writer.add_document(doc).unwrap();
1097            }
1098            writer.commit().await.unwrap();
1099        }
1100
1101        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1102        assert_eq!(index.num_docs().await.unwrap(), 80);
1103        assert!(
1104            index.segment_readers().await.unwrap().len() >= 2,
1105            "Should have >=2 segments after round 2 ingestion"
1106        );
1107
1108        // Search spans both old merged segment and new segments
1109        let results = index.query("fox", 100).await.unwrap();
1110        assert_eq!(results.hits.len(), 80, "all 80 docs should match 'fox'");
1111
1112        let results = index.query("alpha", 100).await.unwrap();
1113        assert_eq!(results.hits.len(), 50, "only round 1 docs match 'alpha'");
1114
1115        let results = index.query("delta", 100).await.unwrap();
1116        assert_eq!(results.hits.len(), 30, "only round 2 docs match 'delta'");
1117
1118        // --- Merge round 2 ---
1119        let mut writer = IndexWriter::open(dir.clone(), config.clone())
1120            .await
1121            .unwrap();
1122        writer.force_merge().await.unwrap();
1123
1124        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1125        assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1126        assert_eq!(index.num_docs().await.unwrap(), 80);
1127
1128        // All searches still correct after second merge
1129        let results = index.query("fox", 100).await.unwrap();
1130        assert_eq!(results.hits.len(), 80, "all 80 docs after merge 2");
1131
1132        let results = index.query("alpha", 100).await.unwrap();
1133        assert_eq!(results.hits.len(), 50, "round 1 docs after merge 2");
1134
1135        let results = index.query("delta", 100).await.unwrap();
1136        assert_eq!(results.hits.len(), 30, "round 2 docs after merge 2");
1137
1138        // Verify all 80 docs retrievable
1139        let reader2 = index.reader().await.unwrap();
1140        let searcher2 = reader2.searcher().await.unwrap();
1141        for i in 0..80 {
1142            let doc = searcher2.doc(i).await.unwrap();
1143            assert!(doc.is_some(), "doc {} should exist after merge 2", i);
1144        }
1145    }
1146
1147    /// Large-scale merge: many segments with overlapping terms, verifying
1148    /// BM25 scoring and doc retrieval after merge.
1149    #[tokio::test]
1150    async fn test_large_scale_merge_correctness() {
1151        let mut schema_builder = SchemaBuilder::default();
1152        let title = schema_builder.add_text_field("title", true, true);
1153        let schema = schema_builder.build();
1154
1155        let dir = RamDirectory::new();
1156        let config = IndexConfig {
1157            max_indexing_memory_bytes: 512,
1158            ..Default::default()
1159        };
1160
1161        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1162            .await
1163            .unwrap();
1164
1165        // 8 batches × 25 docs = 200 docs total
1166        // Terms: "common" appears in all, "unique_N" appears in batch N only
1167        let total_docs = 200u32;
1168        for batch in 0..8 {
1169            for i in 0..25 {
1170                let mut doc = Document::new();
1171                doc.add_text(
1172                    title,
1173                    format!("common shared term unique_{} item{}", batch, i),
1174                );
1175                writer.add_document(doc).unwrap();
1176            }
1177            writer.commit().await.unwrap();
1178        }
1179
1180        // Verify pre-merge
1181        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1182        assert_eq!(index.num_docs().await.unwrap(), total_docs);
1183
1184        let results = index.query("common", 300).await.unwrap();
1185        assert_eq!(
1186            results.hits.len(),
1187            total_docs as usize,
1188            "all docs should match 'common'"
1189        );
1190
1191        // Each unique_N matches exactly 25 docs
1192        for batch in 0..8 {
1193            let q = format!("unique_{}", batch);
1194            let results = index.query(&q, 100).await.unwrap();
1195            assert_eq!(results.hits.len(), 25, "'{}' should match 25 docs", q);
1196        }
1197
1198        // Force merge
1199        let mut writer = IndexWriter::open(dir.clone(), config.clone())
1200            .await
1201            .unwrap();
1202        writer.force_merge().await.unwrap();
1203
1204        // Verify post-merge: single segment, same results
1205        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1206        assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1207        assert_eq!(index.num_docs().await.unwrap(), total_docs);
1208
1209        let results = index.query("common", 300).await.unwrap();
1210        assert_eq!(results.hits.len(), total_docs as usize);
1211
1212        for batch in 0..8 {
1213            let q = format!("unique_{}", batch);
1214            let results = index.query(&q, 100).await.unwrap();
1215            assert_eq!(results.hits.len(), 25, "'{}' after merge", q);
1216        }
1217
1218        // Verify doc retrieval for every doc
1219        let reader = index.reader().await.unwrap();
1220        let searcher = reader.searcher().await.unwrap();
1221        for i in 0..total_docs {
1222            let doc = searcher.doc(i).await.unwrap();
1223            assert!(doc.is_some(), "doc {} missing after merge", i);
1224        }
1225    }
1226
1227    /// Test that auto-merge is triggered by the merge policy during commit,
1228    /// without calling force_merge. Uses MmapDirectory and higher parallelism
1229    /// to reproduce production conditions.
1230    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
1231    async fn test_auto_merge_triggered() {
1232        use crate::directories::MmapDirectory;
1233        let tmp_dir = tempfile::tempdir().unwrap();
1234        let dir = MmapDirectory::new(tmp_dir.path());
1235
1236        let mut schema_builder = SchemaBuilder::default();
1237        let title = schema_builder.add_text_field("title", true, true);
1238        let body = schema_builder.add_text_field("body", true, true);
1239        let schema = schema_builder.build();
1240
1241        // Aggressive policy: merge when 3 segments in same tier
1242        let config = IndexConfig {
1243            max_indexing_memory_bytes: 4096,
1244            num_indexing_threads: 4,
1245            merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1246            ..Default::default()
1247        };
1248
1249        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1250            .await
1251            .unwrap();
1252
1253        // Create 12 segments with ~50 docs each (4x the aggressive threshold of 3)
1254        for batch in 0..12 {
1255            for i in 0..50 {
1256                let mut doc = Document::new();
1257                doc.add_text(title, format!("document_{} batch_{} alpha bravo", i, batch));
1258                doc.add_text(
1259                    body,
1260                    format!(
1261                        "the quick brown fox jumps over lazy dog number {} round {}",
1262                        i, batch
1263                    ),
1264                );
1265                writer.add_document(doc).unwrap();
1266            }
1267            writer.commit().await.unwrap();
1268        }
1269
1270        let pre_merge = writer.segment_manager.get_segment_ids().await.len();
1271
1272        // wait_for_merging_thread waits for the single in-flight merge. After it completes,
1273        // re-evaluate since segments accumulated while the merge was running.
1274        writer.wait_for_merging_thread().await;
1275        writer.maybe_merge().await;
1276        writer.wait_for_merging_thread().await;
1277
1278        // After commit + auto-merge, segment count should be reduced
1279        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1280        let segment_count = index.segment_readers().await.unwrap().len();
1281        eprintln!(
1282            "Segments: {} before merge, {} after auto-merge",
1283            pre_merge, segment_count
1284        );
1285        assert!(
1286            segment_count < pre_merge,
1287            "Expected auto-merge to reduce segments from {}, got {}",
1288            pre_merge,
1289            segment_count
1290        );
1291    }
1292
1293    /// Regression test: commit with dense vector fields + aggressive merge policy.
1294    /// Exercises the race where background merge deletes segment files while
1295    /// maybe_build_vector_index → collect_vectors_for_training tries to open them.
1296    /// Before the fix, this would fail with "IO error: No such file or directory".
1297    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
1298    async fn test_commit_with_vectors_and_background_merge() {
1299        use crate::directories::MmapDirectory;
1300        use crate::dsl::DenseVectorConfig;
1301
1302        let tmp_dir = tempfile::tempdir().unwrap();
1303        let dir = MmapDirectory::new(tmp_dir.path());
1304
1305        let mut schema_builder = SchemaBuilder::default();
1306        let title = schema_builder.add_text_field("title", true, true);
1307        // RaBitQ with very low build_threshold so vector index building triggers during commit
1308        let vec_config = DenseVectorConfig::new(8).with_build_threshold(10);
1309        let embedding =
1310            schema_builder.add_dense_vector_field_with_config("embedding", true, true, vec_config);
1311        let schema = schema_builder.build();
1312
1313        // Aggressive merge: triggers background merges at 3 segments per tier
1314        let config = IndexConfig {
1315            max_indexing_memory_bytes: 4096,
1316            num_indexing_threads: 4,
1317            merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1318            ..Default::default()
1319        };
1320
1321        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1322            .await
1323            .unwrap();
1324
1325        // Create 12 segments with vectors — enough to trigger both
1326        // background merges (aggressive policy) and vector index building (threshold=10)
1327        for batch in 0..12 {
1328            for i in 0..5 {
1329                let mut doc = Document::new();
1330                doc.add_text(title, format!("doc_{}_batch_{}", i, batch));
1331                // 8-dim random vector
1332                let vec: Vec<f32> = (0..8).map(|j| (i * 8 + j + batch) as f32 * 0.1).collect();
1333                doc.add_dense_vector(embedding, vec);
1334                writer.add_document(doc).unwrap();
1335            }
1336            writer.commit().await.unwrap();
1337        }
1338        writer.wait_for_merging_thread().await;
1339
1340        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1341        let num_docs = index.num_docs().await.unwrap();
1342        assert_eq!(num_docs, 60, "Expected 60 docs, got {}", num_docs);
1343    }
1344
1345    /// Stress test: force_merge with many segments (iterative batching).
1346    /// Verifies that merging 50 segments doesn't OOM or exhaust file descriptors.
1347    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1348    async fn test_force_merge_many_segments() {
1349        use crate::directories::MmapDirectory;
1350        let tmp_dir = tempfile::tempdir().unwrap();
1351        let dir = MmapDirectory::new(tmp_dir.path());
1352
1353        let mut schema_builder = SchemaBuilder::default();
1354        let title = schema_builder.add_text_field("title", true, true);
1355        let schema = schema_builder.build();
1356
1357        let config = IndexConfig {
1358            max_indexing_memory_bytes: 512,
1359            ..Default::default()
1360        };
1361
1362        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1363            .await
1364            .unwrap();
1365
1366        // Create 50 tiny segments
1367        for batch in 0..50 {
1368            for i in 0..3 {
1369                let mut doc = Document::new();
1370                doc.add_text(title, format!("term_{} batch_{}", i, batch));
1371                writer.add_document(doc).unwrap();
1372            }
1373            writer.commit().await.unwrap();
1374        }
1375        // Wait for background merges before reading segment count
1376        writer.wait_for_merging_thread().await;
1377
1378        let seg_ids = writer.segment_manager.get_segment_ids().await;
1379        let pre = seg_ids.len();
1380        eprintln!("Segments before force_merge: {}", pre);
1381        assert!(pre >= 2, "Expected multiple segments, got {}", pre);
1382
1383        // Force merge all into one — should iterate in batches, not OOM
1384        writer.force_merge().await.unwrap();
1385
1386        let index2 = Index::open(dir, config).await.unwrap();
1387        let post = index2.segment_readers().await.unwrap().len();
1388        eprintln!("Segments after force_merge: {}", post);
1389        assert_eq!(post, 1);
1390        assert_eq!(index2.num_docs().await.unwrap(), 150);
1391    }
1392
1393    /// Test that background merges produce correct generation metadata.
1394    /// Creates many segments with aggressive policy, commits, waits for merges,
1395    /// and verifies that merged segments have generation >= 1 with correct ancestors.
1396    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1397    async fn test_background_merge_generation() {
1398        use crate::directories::MmapDirectory;
1399        let tmp_dir = tempfile::tempdir().unwrap();
1400        let dir = MmapDirectory::new(tmp_dir.path());
1401
1402        let mut schema_builder = SchemaBuilder::default();
1403        let title = schema_builder.add_text_field("title", true, true);
1404        let schema = schema_builder.build();
1405
1406        let config = IndexConfig {
1407            max_indexing_memory_bytes: 4096,
1408            num_indexing_threads: 2,
1409            merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1410            ..Default::default()
1411        };
1412
1413        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1414            .await
1415            .unwrap();
1416
1417        // Create 15 small segments — enough for aggressive policy to trigger merges
1418        for batch in 0..15 {
1419            for i in 0..5 {
1420                let mut doc = Document::new();
1421                doc.add_text(title, format!("doc_{}_batch_{}", i, batch));
1422                writer.add_document(doc).unwrap();
1423            }
1424            writer.commit().await.unwrap();
1425        }
1426        writer.wait_for_merging_thread().await;
1427
1428        // Read metadata and verify generation tracking
1429        let metas = writer
1430            .segment_manager
1431            .read_metadata(|m| m.segment_metas.clone())
1432            .await;
1433
1434        let max_gen = metas.values().map(|m| m.generation).max().unwrap_or(0);
1435        eprintln!(
1436            "Segments after merge: {}, max generation: {}",
1437            metas.len(),
1438            max_gen
1439        );
1440
1441        // Background merges should have produced at least one merged segment (gen >= 1)
1442        assert!(
1443            max_gen >= 1,
1444            "Expected at least one merged segment (gen >= 1), got max_gen={}",
1445            max_gen
1446        );
1447
1448        // Every merged segment (gen > 0) must have non-empty ancestors
1449        for (id, info) in &metas {
1450            if info.generation > 0 {
1451                assert!(
1452                    !info.ancestors.is_empty(),
1453                    "Segment {} has gen={} but no ancestors",
1454                    id,
1455                    info.generation
1456                );
1457            } else {
1458                assert!(
1459                    info.ancestors.is_empty(),
1460                    "Fresh segment {} has gen=0 but has ancestors",
1461                    id
1462                );
1463            }
1464        }
1465    }
1466
1467    /// Test that merging preserves every single document.
1468    /// Indexes 1000+ unique documents across many segments, force-merges,
1469    /// and verifies exact doc count and that every unique term is searchable.
1470    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1471    async fn test_merge_preserves_all_documents() {
1472        use crate::directories::MmapDirectory;
1473        let tmp_dir = tempfile::tempdir().unwrap();
1474        let dir = MmapDirectory::new(tmp_dir.path());
1475
1476        let mut schema_builder = SchemaBuilder::default();
1477        let title = schema_builder.add_text_field("title", true, true);
1478        let schema = schema_builder.build();
1479
1480        let config = IndexConfig {
1481            max_indexing_memory_bytes: 4096,
1482            ..Default::default()
1483        };
1484
1485        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1486            .await
1487            .unwrap();
1488
1489        let total_docs = 1200;
1490        let docs_per_batch = 60;
1491        let batches = total_docs / docs_per_batch;
1492
1493        // Each doc has a unique term "uid_N" for verification
1494        for batch in 0..batches {
1495            for i in 0..docs_per_batch {
1496                let doc_num = batch * docs_per_batch + i;
1497                let mut doc = Document::new();
1498                doc.add_text(
1499                    title,
1500                    format!("uid_{} common_term batch_{}", doc_num, batch),
1501                );
1502                writer.add_document(doc).unwrap();
1503            }
1504            writer.commit().await.unwrap();
1505        }
1506
1507        let pre_segments = writer.segment_manager.get_segment_ids().await.len();
1508        assert!(
1509            pre_segments >= 2,
1510            "Need multiple segments, got {}",
1511            pre_segments
1512        );
1513
1514        // Force merge to single segment
1515        writer.force_merge().await.unwrap();
1516
1517        let index = Index::open(dir, config).await.unwrap();
1518        assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1519        assert_eq!(
1520            index.num_docs().await.unwrap(),
1521            total_docs as u32,
1522            "Doc count mismatch after force_merge"
1523        );
1524
1525        // Verify every unique document is searchable
1526        let results = index.query("common_term", total_docs + 100).await.unwrap();
1527        assert_eq!(
1528            results.hits.len(),
1529            total_docs,
1530            "common_term should match all docs"
1531        );
1532
1533        // Spot-check unique IDs across the range
1534        for check in [0, 1, total_docs / 2, total_docs - 1] {
1535            let q = format!("uid_{}", check);
1536            let results = index.query(&q, 10).await.unwrap();
1537            assert_eq!(results.hits.len(), 1, "'{}' should match exactly 1 doc", q);
1538        }
1539    }
1540
1541    /// Multi-round commit+merge: verify doc count grows correctly
1542    /// and no documents are lost across multiple merge cycles.
1543    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1544    async fn test_multi_round_merge_doc_integrity() {
1545        use crate::directories::MmapDirectory;
1546        let tmp_dir = tempfile::tempdir().unwrap();
1547        let dir = MmapDirectory::new(tmp_dir.path());
1548
1549        let mut schema_builder = SchemaBuilder::default();
1550        let title = schema_builder.add_text_field("title", true, true);
1551        let schema = schema_builder.build();
1552
1553        let config = IndexConfig {
1554            max_indexing_memory_bytes: 4096,
1555            num_indexing_threads: 2,
1556            merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1557            ..Default::default()
1558        };
1559
1560        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1561            .await
1562            .unwrap();
1563
1564        let mut expected_total = 0u64;
1565
1566        // 4 rounds of: add docs → commit → wait for merges → verify count
1567        for round in 0..4 {
1568            let docs_this_round = 50 + round * 25; // 50, 75, 100, 125
1569            for batch in 0..5 {
1570                for i in 0..docs_this_round / 5 {
1571                    let mut doc = Document::new();
1572                    doc.add_text(
1573                        title,
1574                        format!("round_{}_batch_{}_doc_{} searchable", round, batch, i),
1575                    );
1576                    writer.add_document(doc).unwrap();
1577                }
1578                writer.commit().await.unwrap();
1579            }
1580            writer.wait_for_merging_thread().await;
1581
1582            expected_total += docs_this_round as u64;
1583
1584            let actual = writer
1585                .segment_manager
1586                .read_metadata(|m| {
1587                    m.segment_metas
1588                        .values()
1589                        .map(|s| s.num_docs as u64)
1590                        .sum::<u64>()
1591                })
1592                .await;
1593
1594            assert_eq!(
1595                actual, expected_total,
1596                "Round {}: expected {} docs, metadata reports {}",
1597                round, expected_total, actual
1598            );
1599        }
1600
1601        // Final verify: open fresh and query
1602        let index = Index::open(dir, config).await.unwrap();
1603        assert_eq!(index.num_docs().await.unwrap(), expected_total as u32);
1604
1605        let results = index
1606            .query("searchable", expected_total as usize + 100)
1607            .await
1608            .unwrap();
1609        assert_eq!(
1610            results.hits.len(),
1611            expected_total as usize,
1612            "All docs should match 'searchable'"
1613        );
1614
1615        // Check generation grew across rounds
1616        let metas = index
1617            .segment_manager()
1618            .read_metadata(|m| m.segment_metas.clone())
1619            .await;
1620        let max_gen = metas.values().map(|m| m.generation).max().unwrap_or(0);
1621        eprintln!(
1622            "Final: {} segments, {} docs, max generation={}",
1623            metas.len(),
1624            expected_total,
1625            max_gen
1626        );
1627        assert!(
1628            max_gen >= 1,
1629            "Multiple merge rounds should produce gen >= 1"
1630        );
1631    }
1632}