Skip to main content

hermes_core/index/
mod.rs

1//! Index - multi-segment async search index
2//!
3//! The `Index` is the central concept that provides:
4//! - `Index::create()` / `Index::open()` - create or open an index
5//! - `index.writer()` - get an IndexWriter for adding documents
6//! - `index.reader()` - get an IndexReader for searching (with reload policy)
7//!
8//! The Index owns the SegmentManager which handles segment lifecycle and tracking.
9
10#[cfg(feature = "native")]
11use crate::dsl::Schema;
12#[cfg(feature = "native")]
13use crate::error::Result;
14#[cfg(feature = "native")]
15use std::sync::Arc;
16
17mod searcher;
18pub use searcher::Searcher;
19
20#[cfg(feature = "native")]
21mod reader;
22#[cfg(feature = "native")]
23mod vector_builder;
24#[cfg(feature = "native")]
25mod writer;
26#[cfg(feature = "native")]
27pub use reader::IndexReader;
28#[cfg(feature = "native")]
29pub use writer::{IndexWriter, PreparedCommit};
30
31mod metadata;
32pub use metadata::{FieldVectorMeta, INDEX_META_FILENAME, IndexMetadata, VectorIndexState};
33
34#[cfg(feature = "native")]
35mod helpers;
36#[cfg(feature = "native")]
37pub use helpers::{
38    IndexingStats, SchemaConfig, SchemaFieldConfig, create_index_at_path, create_index_from_sdl,
39    index_documents_from_reader, index_json_document, parse_schema,
40};
41
42/// Default file name for the slice cache
43pub const SLICE_CACHE_FILENAME: &str = "index.slicecache";
44
45/// Index configuration
46#[derive(Debug, Clone)]
47pub struct IndexConfig {
48    /// Number of threads for CPU-intensive tasks (search parallelism)
49    pub num_threads: usize,
50    /// Number of parallel segment builders (documents distributed round-robin)
51    pub num_indexing_threads: usize,
52    /// Number of threads for parallel block compression within each segment
53    pub num_compression_threads: usize,
54    /// Block cache size for term dictionary per segment
55    pub term_cache_blocks: usize,
56    /// Block cache size for document store per segment
57    pub store_cache_blocks: usize,
58    /// Max memory (bytes) across all builders before auto-commit (global limit)
59    pub max_indexing_memory_bytes: usize,
60    /// Merge policy for background segment merging
61    pub merge_policy: Box<dyn crate::merge::MergePolicy>,
62    /// Index optimization mode (adaptive, size-optimized, performance-optimized)
63    pub optimization: crate::structures::IndexOptimization,
64    /// Reload interval in milliseconds for IndexReader (how often to check for new segments)
65    pub reload_interval_ms: u64,
66    /// Maximum number of concurrent background merges (default: 4)
67    pub max_concurrent_merges: usize,
68}
69
70impl Default for IndexConfig {
71    fn default() -> Self {
72        #[cfg(feature = "native")]
73        let indexing_threads = crate::default_indexing_threads();
74        #[cfg(not(feature = "native"))]
75        let indexing_threads = 1;
76
77        #[cfg(feature = "native")]
78        let compression_threads = crate::default_compression_threads();
79        #[cfg(not(feature = "native"))]
80        let compression_threads = 1;
81
82        Self {
83            num_threads: indexing_threads,
84            num_indexing_threads: 1,
85            num_compression_threads: compression_threads,
86            term_cache_blocks: 256,
87            store_cache_blocks: 32,
88            max_indexing_memory_bytes: 256 * 1024 * 1024, // 256 MB default
89            merge_policy: Box::new(crate::merge::TieredMergePolicy::default()),
90            optimization: crate::structures::IndexOptimization::default(),
91            reload_interval_ms: 1000, // 1 second default
92            max_concurrent_merges: 4,
93        }
94    }
95}
96
97/// Multi-segment async Index
98///
99/// The central concept for search. Owns segment lifecycle and provides:
100/// - `Index::create()` / `Index::open()` - create or open an index
101/// - `index.writer()` - get an IndexWriter for adding documents
102/// - `index.reader()` - get an IndexReader for searching with reload policy
103///
104/// All segment management is delegated to SegmentManager.
105#[cfg(feature = "native")]
106pub struct Index<D: crate::directories::DirectoryWriter + 'static> {
107    directory: Arc<D>,
108    schema: Arc<Schema>,
109    config: IndexConfig,
110    /// Segment manager - owns segments, tracker, metadata, and trained structures
111    segment_manager: Arc<crate::merge::SegmentManager<D>>,
112    /// Cached reader (created lazily, reused across calls)
113    cached_reader: tokio::sync::OnceCell<IndexReader<D>>,
114}
115
116#[cfg(feature = "native")]
117impl<D: crate::directories::DirectoryWriter + 'static> Index<D> {
118    /// Create a new index in the directory
119    pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
120        let directory = Arc::new(directory);
121        let schema = Arc::new(schema);
122        let metadata = IndexMetadata::new((*schema).clone());
123
124        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
125            Arc::clone(&directory),
126            Arc::clone(&schema),
127            metadata,
128            config.merge_policy.clone_box(),
129            config.term_cache_blocks,
130            config.max_concurrent_merges,
131        ));
132
133        // Save initial metadata
134        segment_manager.update_metadata(|_| {}).await?;
135
136        Ok(Self {
137            directory,
138            schema,
139            config,
140            segment_manager,
141            cached_reader: tokio::sync::OnceCell::new(),
142        })
143    }
144
145    /// Open an existing index from a directory
146    pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
147        let directory = Arc::new(directory);
148
149        // Load metadata (includes schema)
150        let metadata = IndexMetadata::load(directory.as_ref()).await?;
151        let schema = Arc::new(metadata.schema.clone());
152
153        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
154            Arc::clone(&directory),
155            Arc::clone(&schema),
156            metadata,
157            config.merge_policy.clone_box(),
158            config.term_cache_blocks,
159            config.max_concurrent_merges,
160        ));
161
162        // Load trained structures into SegmentManager's ArcSwap
163        segment_manager.load_and_publish_trained().await;
164
165        Ok(Self {
166            directory,
167            schema,
168            config,
169            segment_manager,
170            cached_reader: tokio::sync::OnceCell::new(),
171        })
172    }
173
174    /// Get the schema
175    pub fn schema(&self) -> &Schema {
176        &self.schema
177    }
178
179    /// Get a reference to the underlying directory
180    pub fn directory(&self) -> &D {
181        &self.directory
182    }
183
184    /// Get the segment manager
185    pub fn segment_manager(&self) -> &Arc<crate::merge::SegmentManager<D>> {
186        &self.segment_manager
187    }
188
189    /// Get an IndexReader for searching (with reload policy)
190    ///
191    /// The reader is cached and reused across calls. The reader's internal
192    /// searcher will reload segments based on its reload interval (configurable via IndexConfig).
193    pub async fn reader(&self) -> Result<&IndexReader<D>> {
194        self.cached_reader
195            .get_or_try_init(|| async {
196                IndexReader::from_segment_manager(
197                    Arc::clone(&self.schema),
198                    Arc::clone(&self.segment_manager),
199                    self.config.term_cache_blocks,
200                    self.config.reload_interval_ms,
201                )
202                .await
203            })
204            .await
205    }
206
207    /// Get the config
208    pub fn config(&self) -> &IndexConfig {
209        &self.config
210    }
211
212    /// Get segment readers for query execution (convenience method)
213    pub async fn segment_readers(&self) -> Result<Vec<Arc<crate::segment::SegmentReader>>> {
214        let reader = self.reader().await?;
215        let searcher = reader.searcher().await?;
216        Ok(searcher.segment_readers().to_vec())
217    }
218
219    /// Total number of documents across all segments
220    pub async fn num_docs(&self) -> Result<u32> {
221        let reader = self.reader().await?;
222        let searcher = reader.searcher().await?;
223        Ok(searcher.num_docs())
224    }
225
226    /// Get default fields for search
227    pub fn default_fields(&self) -> Vec<crate::Field> {
228        if !self.schema.default_fields().is_empty() {
229            self.schema.default_fields().to_vec()
230        } else {
231            self.schema
232                .fields()
233                .filter(|(_, entry)| {
234                    entry.indexed && entry.field_type == crate::dsl::FieldType::Text
235                })
236                .map(|(field, _)| field)
237                .collect()
238        }
239    }
240
241    /// Get tokenizer registry
242    pub fn tokenizers(&self) -> Arc<crate::tokenizer::TokenizerRegistry> {
243        Arc::new(crate::tokenizer::TokenizerRegistry::default())
244    }
245
246    /// Create a query parser for this index
247    pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
248        let default_fields = self.default_fields();
249        let tokenizers = self.tokenizers();
250
251        let query_routers = self.schema.query_routers();
252        if !query_routers.is_empty()
253            && let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers)
254        {
255            return crate::dsl::QueryLanguageParser::with_router(
256                Arc::clone(&self.schema),
257                default_fields,
258                tokenizers,
259                router,
260            );
261        }
262
263        crate::dsl::QueryLanguageParser::new(Arc::clone(&self.schema), default_fields, tokenizers)
264    }
265
266    /// Parse and search using a query string
267    pub async fn query(
268        &self,
269        query_str: &str,
270        limit: usize,
271    ) -> Result<crate::query::SearchResponse> {
272        self.query_offset(query_str, limit, 0).await
273    }
274
275    /// Query with offset for pagination
276    pub async fn query_offset(
277        &self,
278        query_str: &str,
279        limit: usize,
280        offset: usize,
281    ) -> Result<crate::query::SearchResponse> {
282        let parser = self.query_parser();
283        let query = parser
284            .parse(query_str)
285            .map_err(crate::error::Error::Query)?;
286        self.search_offset(query.as_ref(), limit, offset).await
287    }
288
289    /// Search and return results
290    pub async fn search(
291        &self,
292        query: &dyn crate::query::Query,
293        limit: usize,
294    ) -> Result<crate::query::SearchResponse> {
295        self.search_offset(query, limit, 0).await
296    }
297
298    /// Search with offset for pagination
299    pub async fn search_offset(
300        &self,
301        query: &dyn crate::query::Query,
302        limit: usize,
303        offset: usize,
304    ) -> Result<crate::query::SearchResponse> {
305        let reader = self.reader().await?;
306        let searcher = reader.searcher().await?;
307        let segments = searcher.segment_readers();
308
309        let fetch_limit = offset + limit;
310
311        let futures: Vec<_> = segments
312            .iter()
313            .map(|segment| {
314                let sid = segment.meta().id;
315                async move {
316                    let results =
317                        crate::query::search_segment(segment.as_ref(), query, fetch_limit).await?;
318                    Ok::<_, crate::error::Error>(
319                        results
320                            .into_iter()
321                            .map(move |r| (sid, r))
322                            .collect::<Vec<_>>(),
323                    )
324                }
325            })
326            .collect();
327
328        let batches = futures::future::try_join_all(futures).await?;
329        let mut all_results: Vec<(u128, crate::query::SearchResult)> =
330            Vec::with_capacity(batches.iter().map(|b| b.len()).sum());
331        for batch in batches {
332            all_results.extend(batch);
333        }
334
335        all_results.sort_by(|a, b| {
336            b.1.score
337                .partial_cmp(&a.1.score)
338                .unwrap_or(std::cmp::Ordering::Equal)
339        });
340
341        let total_hits = all_results.len() as u32;
342
343        let hits: Vec<crate::query::SearchHit> = all_results
344            .into_iter()
345            .skip(offset)
346            .take(limit)
347            .map(|(segment_id, result)| crate::query::SearchHit {
348                address: crate::query::DocAddress::new(segment_id, result.doc_id),
349                score: result.score,
350                matched_fields: result.extract_ordinals(),
351            })
352            .collect();
353
354        Ok(crate::query::SearchResponse { hits, total_hits })
355    }
356
357    /// Get a document by its unique address
358    pub async fn get_document(
359        &self,
360        address: &crate::query::DocAddress,
361    ) -> Result<Option<crate::dsl::Document>> {
362        let reader = self.reader().await?;
363        let searcher = reader.searcher().await?;
364        searcher.get_document(address).await
365    }
366
367    /// Get posting lists for a term across all segments
368    pub async fn get_postings(
369        &self,
370        field: crate::Field,
371        term: &[u8],
372    ) -> Result<
373        Vec<(
374            Arc<crate::segment::SegmentReader>,
375            crate::structures::BlockPostingList,
376        )>,
377    > {
378        let segments = self.segment_readers().await?;
379        let mut results = Vec::new();
380
381        for segment in segments {
382            if let Some(postings) = segment.get_postings(field, term).await? {
383                results.push((segment, postings));
384            }
385        }
386
387        Ok(results)
388    }
389}
390
391/// Native-only methods for Index
392#[cfg(feature = "native")]
393impl<D: crate::directories::DirectoryWriter + 'static> Index<D> {
394    /// Get an IndexWriter for adding documents
395    pub fn writer(&self) -> writer::IndexWriter<D> {
396        writer::IndexWriter::from_index(self)
397    }
398}
399
400#[cfg(test)]
401mod tests {
402    use super::*;
403    use crate::directories::RamDirectory;
404    use crate::dsl::{Document, SchemaBuilder};
405
406    #[tokio::test]
407    async fn test_index_create_and_search() {
408        let mut schema_builder = SchemaBuilder::default();
409        let title = schema_builder.add_text_field("title", true, true);
410        let body = schema_builder.add_text_field("body", true, true);
411        let schema = schema_builder.build();
412
413        let dir = RamDirectory::new();
414        let config = IndexConfig::default();
415
416        // Create index and add documents
417        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
418            .await
419            .unwrap();
420
421        let mut doc1 = Document::new();
422        doc1.add_text(title, "Hello World");
423        doc1.add_text(body, "This is the first document");
424        writer.add_document(doc1).unwrap();
425
426        let mut doc2 = Document::new();
427        doc2.add_text(title, "Goodbye World");
428        doc2.add_text(body, "This is the second document");
429        writer.add_document(doc2).unwrap();
430
431        writer.commit().await.unwrap();
432
433        // Open for reading
434        let index = Index::open(dir, config).await.unwrap();
435        assert_eq!(index.num_docs().await.unwrap(), 2);
436
437        // Check postings
438        let postings = index.get_postings(title, b"world").await.unwrap();
439        assert_eq!(postings.len(), 1); // One segment
440        assert_eq!(postings[0].1.doc_count(), 2); // Two docs with "world"
441
442        // Retrieve document via searcher snapshot
443        let reader = index.reader().await.unwrap();
444        let searcher = reader.searcher().await.unwrap();
445        let doc = searcher.doc(0).await.unwrap().unwrap();
446        assert_eq!(doc.get_first(title).unwrap().as_text(), Some("Hello World"));
447    }
448
449    #[tokio::test]
450    async fn test_multiple_segments() {
451        let mut schema_builder = SchemaBuilder::default();
452        let title = schema_builder.add_text_field("title", true, true);
453        let schema = schema_builder.build();
454
455        let dir = RamDirectory::new();
456        let config = IndexConfig {
457            max_indexing_memory_bytes: 1024, // Very small to trigger frequent flushes
458            ..Default::default()
459        };
460
461        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
462            .await
463            .unwrap();
464
465        // Add documents in batches to create multiple segments
466        for batch in 0..3 {
467            for i in 0..5 {
468                let mut doc = Document::new();
469                doc.add_text(title, format!("Document {} batch {}", i, batch));
470                writer.add_document(doc).unwrap();
471            }
472            writer.commit().await.unwrap();
473        }
474
475        // Open and check
476        let index = Index::open(dir, config).await.unwrap();
477        assert_eq!(index.num_docs().await.unwrap(), 15);
478        // With queue-based indexing, exact segment count varies
479        assert!(
480            index.segment_readers().await.unwrap().len() >= 2,
481            "Expected multiple segments"
482        );
483    }
484
485    #[tokio::test]
486    async fn test_segment_merge() {
487        let mut schema_builder = SchemaBuilder::default();
488        let title = schema_builder.add_text_field("title", true, true);
489        let schema = schema_builder.build();
490
491        let dir = RamDirectory::new();
492        let config = IndexConfig {
493            max_indexing_memory_bytes: 512, // Very small to trigger frequent flushes
494            ..Default::default()
495        };
496
497        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
498            .await
499            .unwrap();
500
501        // Create multiple segments by flushing between batches
502        for batch in 0..3 {
503            for i in 0..3 {
504                let mut doc = Document::new();
505                doc.add_text(title, format!("Document {} batch {}", i, batch));
506                writer.add_document(doc).unwrap();
507            }
508            writer.commit().await.unwrap();
509        }
510
511        // Should have multiple segments (at least 2, one per flush with docs)
512        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
513        assert!(
514            index.segment_readers().await.unwrap().len() >= 2,
515            "Expected multiple segments"
516        );
517
518        // Force merge
519        let mut writer = IndexWriter::open(dir.clone(), config.clone())
520            .await
521            .unwrap();
522        writer.force_merge().await.unwrap();
523
524        // Should have 1 segment now
525        let index = Index::open(dir, config).await.unwrap();
526        assert_eq!(index.segment_readers().await.unwrap().len(), 1);
527        assert_eq!(index.num_docs().await.unwrap(), 9);
528
529        // Verify all documents accessible (order may vary with queue-based indexing)
530        let reader = index.reader().await.unwrap();
531        let searcher = reader.searcher().await.unwrap();
532        let mut found_docs = 0;
533        for i in 0..9 {
534            if searcher.doc(i).await.unwrap().is_some() {
535                found_docs += 1;
536            }
537        }
538        assert_eq!(found_docs, 9);
539    }
540
541    #[tokio::test]
542    async fn test_match_query() {
543        let mut schema_builder = SchemaBuilder::default();
544        let title = schema_builder.add_text_field("title", true, true);
545        let body = schema_builder.add_text_field("body", true, true);
546        let schema = schema_builder.build();
547
548        let dir = RamDirectory::new();
549        let config = IndexConfig::default();
550
551        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
552            .await
553            .unwrap();
554
555        let mut doc1 = Document::new();
556        doc1.add_text(title, "rust programming");
557        doc1.add_text(body, "Learn rust language");
558        writer.add_document(doc1).unwrap();
559
560        let mut doc2 = Document::new();
561        doc2.add_text(title, "python programming");
562        doc2.add_text(body, "Learn python language");
563        writer.add_document(doc2).unwrap();
564
565        writer.commit().await.unwrap();
566
567        let index = Index::open(dir, config).await.unwrap();
568
569        // Test match query with multiple default fields
570        let results = index.query("rust", 10).await.unwrap();
571        assert_eq!(results.hits.len(), 1);
572
573        // Test match query with multiple tokens
574        let results = index.query("rust programming", 10).await.unwrap();
575        assert!(!results.hits.is_empty());
576
577        // Verify hit has address (segment_id + doc_id)
578        let hit = &results.hits[0];
579        assert!(!hit.address.segment_id.is_empty(), "Should have segment_id");
580
581        // Verify document retrieval by address
582        let doc = index.get_document(&hit.address).await.unwrap().unwrap();
583        assert!(
584            !doc.field_values().is_empty(),
585            "Doc should have field values"
586        );
587
588        // Also verify doc retrieval via searcher snapshot
589        let reader = index.reader().await.unwrap();
590        let searcher = reader.searcher().await.unwrap();
591        let doc = searcher.doc(0).await.unwrap().unwrap();
592        assert!(
593            !doc.field_values().is_empty(),
594            "Doc should have field values"
595        );
596    }
597
598    #[tokio::test]
599    async fn test_slice_cache_warmup_and_load() {
600        use crate::directories::SliceCachingDirectory;
601
602        let mut schema_builder = SchemaBuilder::default();
603        let title = schema_builder.add_text_field("title", true, true);
604        let body = schema_builder.add_text_field("body", true, true);
605        let schema = schema_builder.build();
606
607        let dir = RamDirectory::new();
608        let config = IndexConfig::default();
609
610        // Create index with some documents
611        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
612            .await
613            .unwrap();
614
615        for i in 0..10 {
616            let mut doc = Document::new();
617            doc.add_text(title, format!("Document {} about rust", i));
618            doc.add_text(body, format!("This is body text number {}", i));
619            writer.add_document(doc).unwrap();
620        }
621        writer.commit().await.unwrap();
622
623        // Open with slice caching and perform some operations to warm up cache
624        let caching_dir = SliceCachingDirectory::new(dir.clone(), 1024 * 1024);
625        let index = Index::open(caching_dir, config.clone()).await.unwrap();
626
627        // Perform a search to warm up the cache
628        let results = index.query("rust", 10).await.unwrap();
629        assert!(!results.hits.is_empty());
630
631        // Check cache stats - should have cached some data
632        let stats = index.directory.stats();
633        assert!(stats.total_bytes > 0, "Cache should have data after search");
634    }
635
636    #[tokio::test]
637    async fn test_multivalue_field_indexing_and_search() {
638        let mut schema_builder = SchemaBuilder::default();
639        let uris = schema_builder.add_text_field("uris", true, true);
640        let title = schema_builder.add_text_field("title", true, true);
641        let schema = schema_builder.build();
642
643        let dir = RamDirectory::new();
644        let config = IndexConfig::default();
645
646        // Create index and add document with multi-value field
647        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
648            .await
649            .unwrap();
650
651        let mut doc = Document::new();
652        doc.add_text(uris, "one");
653        doc.add_text(uris, "two");
654        doc.add_text(title, "Test Document");
655        writer.add_document(doc).unwrap();
656
657        // Add another document with different uris
658        let mut doc2 = Document::new();
659        doc2.add_text(uris, "three");
660        doc2.add_text(title, "Another Document");
661        writer.add_document(doc2).unwrap();
662
663        writer.commit().await.unwrap();
664
665        // Open for reading
666        let index = Index::open(dir, config).await.unwrap();
667        assert_eq!(index.num_docs().await.unwrap(), 2);
668
669        // Verify document retrieval preserves all values
670        let reader = index.reader().await.unwrap();
671        let searcher = reader.searcher().await.unwrap();
672        let doc = searcher.doc(0).await.unwrap().unwrap();
673        let all_uris: Vec<_> = doc.get_all(uris).collect();
674        assert_eq!(all_uris.len(), 2, "Should have 2 uris values");
675        assert_eq!(all_uris[0].as_text(), Some("one"));
676        assert_eq!(all_uris[1].as_text(), Some("two"));
677
678        // Verify to_json returns array for multi-value field
679        let json = doc.to_json(index.schema());
680        let uris_json = json.get("uris").unwrap();
681        assert!(uris_json.is_array(), "Multi-value field should be an array");
682        let uris_arr = uris_json.as_array().unwrap();
683        assert_eq!(uris_arr.len(), 2);
684        assert_eq!(uris_arr[0].as_str(), Some("one"));
685        assert_eq!(uris_arr[1].as_str(), Some("two"));
686
687        // Verify both values are searchable
688        let results = index.query("uris:one", 10).await.unwrap();
689        assert_eq!(results.hits.len(), 1, "Should find doc with 'one'");
690        assert_eq!(results.hits[0].address.doc_id, 0);
691
692        let results = index.query("uris:two", 10).await.unwrap();
693        assert_eq!(results.hits.len(), 1, "Should find doc with 'two'");
694        assert_eq!(results.hits[0].address.doc_id, 0);
695
696        let results = index.query("uris:three", 10).await.unwrap();
697        assert_eq!(results.hits.len(), 1, "Should find doc with 'three'");
698        assert_eq!(results.hits[0].address.doc_id, 1);
699
700        // Verify searching for non-existent value returns no results
701        let results = index.query("uris:nonexistent", 10).await.unwrap();
702        assert_eq!(results.hits.len(), 0, "Should not find non-existent value");
703    }
704
705    /// Comprehensive test for WAND optimization in BooleanQuery OR queries
706    ///
707    /// This test verifies that:
708    /// 1. BooleanQuery with multiple SHOULD term queries uses WAND automatically
709    /// 2. Search results are correct regardless of WAND optimization
710    /// 3. Scores are reasonable for matching documents
711    #[tokio::test]
712    async fn test_wand_optimization_for_or_queries() {
713        use crate::query::{BooleanQuery, TermQuery};
714
715        let mut schema_builder = SchemaBuilder::default();
716        let content = schema_builder.add_text_field("content", true, true);
717        let schema = schema_builder.build();
718
719        let dir = RamDirectory::new();
720        let config = IndexConfig::default();
721
722        // Create index with documents containing various terms
723        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
724            .await
725            .unwrap();
726
727        // Doc 0: contains "rust" and "programming"
728        let mut doc = Document::new();
729        doc.add_text(content, "rust programming language is fast");
730        writer.add_document(doc).unwrap();
731
732        // Doc 1: contains "rust" only
733        let mut doc = Document::new();
734        doc.add_text(content, "rust is a systems language");
735        writer.add_document(doc).unwrap();
736
737        // Doc 2: contains "programming" only
738        let mut doc = Document::new();
739        doc.add_text(content, "programming is fun");
740        writer.add_document(doc).unwrap();
741
742        // Doc 3: contains "python" (neither rust nor programming)
743        let mut doc = Document::new();
744        doc.add_text(content, "python is easy to learn");
745        writer.add_document(doc).unwrap();
746
747        // Doc 4: contains both "rust" and "programming" multiple times
748        let mut doc = Document::new();
749        doc.add_text(content, "rust rust programming programming systems");
750        writer.add_document(doc).unwrap();
751
752        writer.commit().await.unwrap();
753
754        // Open for reading
755        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
756
757        // Test 1: Pure OR query with multiple terms (should use WAND automatically)
758        let or_query = BooleanQuery::new()
759            .should(TermQuery::text(content, "rust"))
760            .should(TermQuery::text(content, "programming"));
761
762        let results = index.search(&or_query, 10).await.unwrap();
763
764        // Should find docs 0, 1, 2, 4 (all that contain "rust" OR "programming")
765        assert_eq!(results.hits.len(), 4, "Should find exactly 4 documents");
766
767        let doc_ids: Vec<u32> = results.hits.iter().map(|h| h.address.doc_id).collect();
768        assert!(doc_ids.contains(&0), "Should find doc 0");
769        assert!(doc_ids.contains(&1), "Should find doc 1");
770        assert!(doc_ids.contains(&2), "Should find doc 2");
771        assert!(doc_ids.contains(&4), "Should find doc 4");
772        assert!(
773            !doc_ids.contains(&3),
774            "Should NOT find doc 3 (only has 'python')"
775        );
776
777        // Test 2: Single term query (should NOT use WAND, but still work)
778        let single_query = BooleanQuery::new().should(TermQuery::text(content, "rust"));
779
780        let results = index.search(&single_query, 10).await.unwrap();
781        assert_eq!(results.hits.len(), 3, "Should find 3 documents with 'rust'");
782
783        // Test 3: Query with MUST (should NOT use WAND)
784        let must_query = BooleanQuery::new()
785            .must(TermQuery::text(content, "rust"))
786            .should(TermQuery::text(content, "programming"));
787
788        let results = index.search(&must_query, 10).await.unwrap();
789        // Must have "rust", optionally "programming"
790        assert_eq!(results.hits.len(), 3, "Should find 3 documents with 'rust'");
791
792        // Test 4: Query with MUST_NOT (should NOT use WAND)
793        let must_not_query = BooleanQuery::new()
794            .should(TermQuery::text(content, "rust"))
795            .should(TermQuery::text(content, "programming"))
796            .must_not(TermQuery::text(content, "systems"));
797
798        let results = index.search(&must_not_query, 10).await.unwrap();
799        // Should exclude docs with "systems" (doc 1 and 4)
800        let doc_ids: Vec<u32> = results.hits.iter().map(|h| h.address.doc_id).collect();
801        assert!(
802            !doc_ids.contains(&1),
803            "Should NOT find doc 1 (has 'systems')"
804        );
805        assert!(
806            !doc_ids.contains(&4),
807            "Should NOT find doc 4 (has 'systems')"
808        );
809
810        // Test 5: Verify top-k limit works correctly with WAND
811        let or_query = BooleanQuery::new()
812            .should(TermQuery::text(content, "rust"))
813            .should(TermQuery::text(content, "programming"));
814
815        let results = index.search(&or_query, 2).await.unwrap();
816        assert_eq!(results.hits.len(), 2, "Should return only top 2 results");
817
818        // Top results should be docs that match both terms (higher scores)
819        // Doc 0 and 4 contain both "rust" and "programming"
820    }
821
822    /// Test that WAND optimization produces same results as non-WAND for correctness
823    #[tokio::test]
824    async fn test_wand_results_match_standard_boolean() {
825        use crate::query::{BooleanQuery, TermQuery, WandOrQuery};
826
827        let mut schema_builder = SchemaBuilder::default();
828        let content = schema_builder.add_text_field("content", true, true);
829        let schema = schema_builder.build();
830
831        let dir = RamDirectory::new();
832        let config = IndexConfig::default();
833
834        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
835            .await
836            .unwrap();
837
838        // Add several documents
839        for i in 0..10 {
840            let mut doc = Document::new();
841            let text = match i % 4 {
842                0 => "apple banana cherry",
843                1 => "apple orange",
844                2 => "banana grape",
845                _ => "cherry date",
846            };
847            doc.add_text(content, text);
848            writer.add_document(doc).unwrap();
849        }
850
851        writer.commit().await.unwrap();
852        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
853
854        // Compare explicit WandOrQuery with auto-optimized BooleanQuery
855        let wand_query = WandOrQuery::new(content).term("apple").term("banana");
856
857        let bool_query = BooleanQuery::new()
858            .should(TermQuery::text(content, "apple"))
859            .should(TermQuery::text(content, "banana"));
860
861        let wand_results = index.search(&wand_query, 10).await.unwrap();
862        let bool_results = index.search(&bool_query, 10).await.unwrap();
863
864        // Both should find the same documents
865        assert_eq!(
866            wand_results.hits.len(),
867            bool_results.hits.len(),
868            "WAND and Boolean should find same number of docs"
869        );
870
871        let wand_docs: std::collections::HashSet<u32> =
872            wand_results.hits.iter().map(|h| h.address.doc_id).collect();
873        let bool_docs: std::collections::HashSet<u32> =
874            bool_results.hits.iter().map(|h| h.address.doc_id).collect();
875
876        assert_eq!(
877            wand_docs, bool_docs,
878            "WAND and Boolean should find same documents"
879        );
880    }
881
882    #[tokio::test]
883    async fn test_vector_index_threshold_switch() {
884        use crate::dsl::{DenseVectorConfig, DenseVectorQuantization, VectorIndexType};
885
886        // Create schema with dense vector field configured for IVF-RaBitQ
887        let mut schema_builder = SchemaBuilder::default();
888        let title = schema_builder.add_text_field("title", true, true);
889        let embedding = schema_builder.add_dense_vector_field_with_config(
890            "embedding",
891            true, // indexed
892            true, // stored
893            DenseVectorConfig {
894                dim: 8,
895                index_type: VectorIndexType::IvfRaBitQ,
896                quantization: DenseVectorQuantization::F32,
897                num_clusters: Some(4), // Small for test
898                nprobe: 2,
899                build_threshold: Some(50), // Build when we have 50+ vectors
900            },
901        );
902        let schema = schema_builder.build();
903
904        let dir = RamDirectory::new();
905        let config = IndexConfig::default();
906
907        // Phase 1: Add vectors below threshold (should use Flat index)
908        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
909            .await
910            .unwrap();
911
912        // Add 30 documents (below threshold of 50)
913        for i in 0..30 {
914            let mut doc = Document::new();
915            doc.add_text(title, format!("Document {}", i));
916            // Simple embedding: [i, i, i, i, i, i, i, i] normalized
917            let vec: Vec<f32> = (0..8).map(|_| (i as f32) / 30.0).collect();
918            doc.add_dense_vector(embedding, vec);
919            writer.add_document(doc).unwrap();
920        }
921        writer.commit().await.unwrap();
922
923        // Open index and verify it's using Flat (not built yet)
924        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
925        assert!(
926            index.segment_manager.trained().is_none(),
927            "Should not have trained centroids below threshold"
928        );
929
930        // Search should work with Flat index
931        let query_vec: Vec<f32> = vec![0.5; 8];
932        let segments = index.segment_readers().await.unwrap();
933        assert!(!segments.is_empty());
934
935        let results = segments[0]
936            .search_dense_vector(
937                embedding,
938                &query_vec,
939                5,
940                0,
941                1,
942                crate::query::MultiValueCombiner::Max,
943            )
944            .await
945            .unwrap();
946        assert!(!results.is_empty(), "Flat search should return results");
947
948        // Phase 2: Add more vectors to cross threshold
949        let mut writer = IndexWriter::open(dir.clone(), config.clone())
950            .await
951            .unwrap();
952
953        // Add 30 more documents (total 60, above threshold of 50)
954        for i in 30..60 {
955            let mut doc = Document::new();
956            doc.add_text(title, format!("Document {}", i));
957            let vec: Vec<f32> = (0..8).map(|_| (i as f32) / 60.0).collect();
958            doc.add_dense_vector(embedding, vec);
959            writer.add_document(doc).unwrap();
960        }
961        writer.commit().await.unwrap();
962
963        // Manually trigger vector index build (no longer auto-triggered by commit)
964        writer.build_vector_index().await.unwrap();
965
966        // Reopen index and verify trained structures are loaded
967        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
968        assert!(
969            index.segment_manager.trained().is_some(),
970            "Should have loaded trained centroids for embedding field"
971        );
972
973        // Search should still work
974        let segments = index.segment_readers().await.unwrap();
975        let results = segments[0]
976            .search_dense_vector(
977                embedding,
978                &query_vec,
979                5,
980                0,
981                1,
982                crate::query::MultiValueCombiner::Max,
983            )
984            .await
985            .unwrap();
986        assert!(
987            !results.is_empty(),
988            "Search should return results after build"
989        );
990
991        // Phase 3: Verify calling build_vector_index again is a no-op
992        let writer = IndexWriter::open(dir.clone(), config.clone())
993            .await
994            .unwrap();
995        writer.build_vector_index().await.unwrap(); // Should skip training
996
997        // Still built (trained structures present in ArcSwap)
998        assert!(writer.segment_manager.trained().is_some());
999    }
1000
1001    /// Multi-round merge: flush many small segments, merge, add more, merge again.
1002    /// Verifies search correctness (term + phrase queries) through multiple merge rounds.
1003    #[tokio::test]
1004    async fn test_multi_round_merge_with_search() {
1005        let mut schema_builder = SchemaBuilder::default();
1006        let title = schema_builder.add_text_field("title", true, true);
1007        let body = schema_builder.add_text_field("body", true, true);
1008        let schema = schema_builder.build();
1009
1010        let dir = RamDirectory::new();
1011        let config = IndexConfig {
1012            max_indexing_memory_bytes: 512,
1013            ..Default::default()
1014        };
1015
1016        // --- Round 1: 5 segments × 10 docs = 50 docs ---
1017        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1018            .await
1019            .unwrap();
1020
1021        for batch in 0..5 {
1022            for i in 0..10 {
1023                let mut doc = Document::new();
1024                doc.add_text(
1025                    title,
1026                    format!("alpha bravo charlie batch{} doc{}", batch, i),
1027                );
1028                doc.add_text(
1029                    body,
1030                    format!("the quick brown fox jumps over the lazy dog number {}", i),
1031                );
1032                writer.add_document(doc).unwrap();
1033            }
1034            writer.commit().await.unwrap();
1035        }
1036
1037        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1038        let pre_merge_segments = index.segment_readers().await.unwrap().len();
1039        assert!(
1040            pre_merge_segments >= 3,
1041            "Expected >=3 segments, got {}",
1042            pre_merge_segments
1043        );
1044        assert_eq!(index.num_docs().await.unwrap(), 50);
1045
1046        // Search before merge
1047        let results = index.query("alpha", 100).await.unwrap();
1048        assert_eq!(results.hits.len(), 50, "all 50 docs should match 'alpha'");
1049
1050        let results = index.query("fox", 100).await.unwrap();
1051        assert_eq!(results.hits.len(), 50, "all 50 docs should match 'fox'");
1052
1053        // --- Merge round 1 ---
1054        let mut writer = IndexWriter::open(dir.clone(), config.clone())
1055            .await
1056            .unwrap();
1057        writer.force_merge().await.unwrap();
1058
1059        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1060        assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1061        assert_eq!(index.num_docs().await.unwrap(), 50);
1062
1063        // Search after first merge
1064        let results = index.query("alpha", 100).await.unwrap();
1065        assert_eq!(
1066            results.hits.len(),
1067            50,
1068            "all 50 docs should match 'alpha' after merge 1"
1069        );
1070
1071        let results = index.query("fox", 100).await.unwrap();
1072        assert_eq!(
1073            results.hits.len(),
1074            50,
1075            "all 50 docs should match 'fox' after merge 1"
1076        );
1077
1078        // Verify all docs retrievable
1079        let reader1 = index.reader().await.unwrap();
1080        let searcher1 = reader1.searcher().await.unwrap();
1081        for i in 0..50 {
1082            let doc = searcher1.doc(i).await.unwrap();
1083            assert!(doc.is_some(), "doc {} should exist after merge 1", i);
1084        }
1085
1086        // --- Round 2: add 30 more docs in 3 segments ---
1087        let mut writer = IndexWriter::open(dir.clone(), config.clone())
1088            .await
1089            .unwrap();
1090        for batch in 0..3 {
1091            for i in 0..10 {
1092                let mut doc = Document::new();
1093                doc.add_text(
1094                    title,
1095                    format!("delta echo foxtrot round2_batch{} doc{}", batch, i),
1096                );
1097                doc.add_text(
1098                    body,
1099                    format!("the quick brown fox jumps again number {}", i),
1100                );
1101                writer.add_document(doc).unwrap();
1102            }
1103            writer.commit().await.unwrap();
1104        }
1105
1106        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1107        assert_eq!(index.num_docs().await.unwrap(), 80);
1108        assert!(
1109            index.segment_readers().await.unwrap().len() >= 2,
1110            "Should have >=2 segments after round 2 ingestion"
1111        );
1112
1113        // Search spans both old merged segment and new segments
1114        let results = index.query("fox", 100).await.unwrap();
1115        assert_eq!(results.hits.len(), 80, "all 80 docs should match 'fox'");
1116
1117        let results = index.query("alpha", 100).await.unwrap();
1118        assert_eq!(results.hits.len(), 50, "only round 1 docs match 'alpha'");
1119
1120        let results = index.query("delta", 100).await.unwrap();
1121        assert_eq!(results.hits.len(), 30, "only round 2 docs match 'delta'");
1122
1123        // --- Merge round 2 ---
1124        let mut writer = IndexWriter::open(dir.clone(), config.clone())
1125            .await
1126            .unwrap();
1127        writer.force_merge().await.unwrap();
1128
1129        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1130        assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1131        assert_eq!(index.num_docs().await.unwrap(), 80);
1132
1133        // All searches still correct after second merge
1134        let results = index.query("fox", 100).await.unwrap();
1135        assert_eq!(results.hits.len(), 80, "all 80 docs after merge 2");
1136
1137        let results = index.query("alpha", 100).await.unwrap();
1138        assert_eq!(results.hits.len(), 50, "round 1 docs after merge 2");
1139
1140        let results = index.query("delta", 100).await.unwrap();
1141        assert_eq!(results.hits.len(), 30, "round 2 docs after merge 2");
1142
1143        // Verify all 80 docs retrievable
1144        let reader2 = index.reader().await.unwrap();
1145        let searcher2 = reader2.searcher().await.unwrap();
1146        for i in 0..80 {
1147            let doc = searcher2.doc(i).await.unwrap();
1148            assert!(doc.is_some(), "doc {} should exist after merge 2", i);
1149        }
1150    }
1151
1152    /// Large-scale merge: many segments with overlapping terms, verifying
1153    /// BM25 scoring and doc retrieval after merge.
1154    #[tokio::test]
1155    async fn test_large_scale_merge_correctness() {
1156        let mut schema_builder = SchemaBuilder::default();
1157        let title = schema_builder.add_text_field("title", true, true);
1158        let schema = schema_builder.build();
1159
1160        let dir = RamDirectory::new();
1161        let config = IndexConfig {
1162            max_indexing_memory_bytes: 512,
1163            ..Default::default()
1164        };
1165
1166        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1167            .await
1168            .unwrap();
1169
1170        // 8 batches × 25 docs = 200 docs total
1171        // Terms: "common" appears in all, "unique_N" appears in batch N only
1172        let total_docs = 200u32;
1173        for batch in 0..8 {
1174            for i in 0..25 {
1175                let mut doc = Document::new();
1176                doc.add_text(
1177                    title,
1178                    format!("common shared term unique_{} item{}", batch, i),
1179                );
1180                writer.add_document(doc).unwrap();
1181            }
1182            writer.commit().await.unwrap();
1183        }
1184
1185        // Verify pre-merge
1186        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1187        assert_eq!(index.num_docs().await.unwrap(), total_docs);
1188
1189        let results = index.query("common", 300).await.unwrap();
1190        assert_eq!(
1191            results.hits.len(),
1192            total_docs as usize,
1193            "all docs should match 'common'"
1194        );
1195
1196        // Each unique_N matches exactly 25 docs
1197        for batch in 0..8 {
1198            let q = format!("unique_{}", batch);
1199            let results = index.query(&q, 100).await.unwrap();
1200            assert_eq!(results.hits.len(), 25, "'{}' should match 25 docs", q);
1201        }
1202
1203        // Force merge
1204        let mut writer = IndexWriter::open(dir.clone(), config.clone())
1205            .await
1206            .unwrap();
1207        writer.force_merge().await.unwrap();
1208
1209        // Verify post-merge: single segment, same results
1210        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1211        assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1212        assert_eq!(index.num_docs().await.unwrap(), total_docs);
1213
1214        let results = index.query("common", 300).await.unwrap();
1215        assert_eq!(results.hits.len(), total_docs as usize);
1216
1217        for batch in 0..8 {
1218            let q = format!("unique_{}", batch);
1219            let results = index.query(&q, 100).await.unwrap();
1220            assert_eq!(results.hits.len(), 25, "'{}' after merge", q);
1221        }
1222
1223        // Verify doc retrieval for every doc
1224        let reader = index.reader().await.unwrap();
1225        let searcher = reader.searcher().await.unwrap();
1226        for i in 0..total_docs {
1227            let doc = searcher.doc(i).await.unwrap();
1228            assert!(doc.is_some(), "doc {} missing after merge", i);
1229        }
1230    }
1231
1232    /// Test that auto-merge is triggered by the merge policy during commit,
1233    /// without calling force_merge. Uses MmapDirectory and higher parallelism
1234    /// to reproduce production conditions.
1235    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
1236    async fn test_auto_merge_triggered() {
1237        use crate::directories::MmapDirectory;
1238        let tmp_dir = tempfile::tempdir().unwrap();
1239        let dir = MmapDirectory::new(tmp_dir.path());
1240
1241        let mut schema_builder = SchemaBuilder::default();
1242        let title = schema_builder.add_text_field("title", true, true);
1243        let body = schema_builder.add_text_field("body", true, true);
1244        let schema = schema_builder.build();
1245
1246        // Aggressive policy: merge when 3 segments in same tier
1247        let config = IndexConfig {
1248            max_indexing_memory_bytes: 4096,
1249            num_indexing_threads: 4,
1250            merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1251            ..Default::default()
1252        };
1253
1254        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1255            .await
1256            .unwrap();
1257
1258        // Create 12 segments with ~50 docs each (4x the aggressive threshold of 3)
1259        for batch in 0..12 {
1260            for i in 0..50 {
1261                let mut doc = Document::new();
1262                doc.add_text(title, format!("document_{} batch_{} alpha bravo", i, batch));
1263                doc.add_text(
1264                    body,
1265                    format!(
1266                        "the quick brown fox jumps over lazy dog number {} round {}",
1267                        i, batch
1268                    ),
1269                );
1270                writer.add_document(doc).unwrap();
1271            }
1272            writer.commit().await.unwrap();
1273        }
1274
1275        let pre_merge = writer.segment_manager.get_segment_ids().await.len();
1276
1277        // wait_for_merging_thread waits for the single in-flight merge. After it completes,
1278        // re-evaluate since segments accumulated while the merge was running.
1279        writer.wait_for_merging_thread().await;
1280        writer.maybe_merge().await;
1281        writer.wait_for_merging_thread().await;
1282
1283        // After commit + auto-merge, segment count should be reduced
1284        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1285        let segment_count = index.segment_readers().await.unwrap().len();
1286        eprintln!(
1287            "Segments: {} before merge, {} after auto-merge",
1288            pre_merge, segment_count
1289        );
1290        assert!(
1291            segment_count < pre_merge,
1292            "Expected auto-merge to reduce segments from {}, got {}",
1293            pre_merge,
1294            segment_count
1295        );
1296    }
1297
1298    /// Regression test: commit with dense vector fields + aggressive merge policy.
1299    /// Exercises the race where background merge deletes segment files while
1300    /// maybe_build_vector_index → collect_vectors_for_training tries to open them.
1301    /// Before the fix, this would fail with "IO error: No such file or directory".
1302    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
1303    async fn test_commit_with_vectors_and_background_merge() {
1304        use crate::directories::MmapDirectory;
1305        use crate::dsl::DenseVectorConfig;
1306
1307        let tmp_dir = tempfile::tempdir().unwrap();
1308        let dir = MmapDirectory::new(tmp_dir.path());
1309
1310        let mut schema_builder = SchemaBuilder::default();
1311        let title = schema_builder.add_text_field("title", true, true);
1312        // RaBitQ with very low build_threshold so vector index building triggers during commit
1313        let vec_config = DenseVectorConfig::new(8).with_build_threshold(10);
1314        let embedding =
1315            schema_builder.add_dense_vector_field_with_config("embedding", true, true, vec_config);
1316        let schema = schema_builder.build();
1317
1318        // Aggressive merge: triggers background merges at 3 segments per tier
1319        let config = IndexConfig {
1320            max_indexing_memory_bytes: 4096,
1321            num_indexing_threads: 4,
1322            merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1323            ..Default::default()
1324        };
1325
1326        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1327            .await
1328            .unwrap();
1329
1330        // Create 12 segments with vectors — enough to trigger both
1331        // background merges (aggressive policy) and vector index building (threshold=10)
1332        for batch in 0..12 {
1333            for i in 0..5 {
1334                let mut doc = Document::new();
1335                doc.add_text(title, format!("doc_{}_batch_{}", i, batch));
1336                // 8-dim random vector
1337                let vec: Vec<f32> = (0..8).map(|j| (i * 8 + j + batch) as f32 * 0.1).collect();
1338                doc.add_dense_vector(embedding, vec);
1339                writer.add_document(doc).unwrap();
1340            }
1341            writer.commit().await.unwrap();
1342        }
1343        writer.wait_for_merging_thread().await;
1344
1345        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1346        let num_docs = index.num_docs().await.unwrap();
1347        assert_eq!(num_docs, 60, "Expected 60 docs, got {}", num_docs);
1348    }
1349
1350    /// Stress test: force_merge with many segments (iterative batching).
1351    /// Verifies that merging 50 segments doesn't OOM or exhaust file descriptors.
1352    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1353    async fn test_force_merge_many_segments() {
1354        use crate::directories::MmapDirectory;
1355        let tmp_dir = tempfile::tempdir().unwrap();
1356        let dir = MmapDirectory::new(tmp_dir.path());
1357
1358        let mut schema_builder = SchemaBuilder::default();
1359        let title = schema_builder.add_text_field("title", true, true);
1360        let schema = schema_builder.build();
1361
1362        let config = IndexConfig {
1363            max_indexing_memory_bytes: 512,
1364            ..Default::default()
1365        };
1366
1367        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1368            .await
1369            .unwrap();
1370
1371        // Create 50 tiny segments
1372        for batch in 0..50 {
1373            for i in 0..3 {
1374                let mut doc = Document::new();
1375                doc.add_text(title, format!("term_{} batch_{}", i, batch));
1376                writer.add_document(doc).unwrap();
1377            }
1378            writer.commit().await.unwrap();
1379        }
1380        // Wait for background merges before reading segment count
1381        writer.wait_for_merging_thread().await;
1382
1383        let seg_ids = writer.segment_manager.get_segment_ids().await;
1384        let pre = seg_ids.len();
1385        eprintln!("Segments before force_merge: {}", pre);
1386        assert!(pre >= 2, "Expected multiple segments, got {}", pre);
1387
1388        // Force merge all into one — should iterate in batches, not OOM
1389        writer.force_merge().await.unwrap();
1390
1391        let index2 = Index::open(dir, config).await.unwrap();
1392        let post = index2.segment_readers().await.unwrap().len();
1393        eprintln!("Segments after force_merge: {}", post);
1394        assert_eq!(post, 1);
1395        assert_eq!(index2.num_docs().await.unwrap(), 150);
1396    }
1397
1398    /// Test that background merges produce correct generation metadata.
1399    /// Creates many segments with aggressive policy, commits, waits for merges,
1400    /// and verifies that merged segments have generation >= 1 with correct ancestors.
1401    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1402    async fn test_background_merge_generation() {
1403        use crate::directories::MmapDirectory;
1404        let tmp_dir = tempfile::tempdir().unwrap();
1405        let dir = MmapDirectory::new(tmp_dir.path());
1406
1407        let mut schema_builder = SchemaBuilder::default();
1408        let title = schema_builder.add_text_field("title", true, true);
1409        let schema = schema_builder.build();
1410
1411        let config = IndexConfig {
1412            max_indexing_memory_bytes: 4096,
1413            num_indexing_threads: 2,
1414            merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1415            ..Default::default()
1416        };
1417
1418        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1419            .await
1420            .unwrap();
1421
1422        // Create 15 small segments — enough for aggressive policy to trigger merges
1423        for batch in 0..15 {
1424            for i in 0..5 {
1425                let mut doc = Document::new();
1426                doc.add_text(title, format!("doc_{}_batch_{}", i, batch));
1427                writer.add_document(doc).unwrap();
1428            }
1429            writer.commit().await.unwrap();
1430        }
1431        writer.wait_for_merging_thread().await;
1432
1433        // Read metadata and verify generation tracking
1434        let metas = writer
1435            .segment_manager
1436            .read_metadata(|m| m.segment_metas.clone())
1437            .await;
1438
1439        let max_gen = metas.values().map(|m| m.generation).max().unwrap_or(0);
1440        eprintln!(
1441            "Segments after merge: {}, max generation: {}",
1442            metas.len(),
1443            max_gen
1444        );
1445
1446        // Background merges should have produced at least one merged segment (gen >= 1)
1447        assert!(
1448            max_gen >= 1,
1449            "Expected at least one merged segment (gen >= 1), got max_gen={}",
1450            max_gen
1451        );
1452
1453        // Every merged segment (gen > 0) must have non-empty ancestors
1454        for (id, info) in &metas {
1455            if info.generation > 0 {
1456                assert!(
1457                    !info.ancestors.is_empty(),
1458                    "Segment {} has gen={} but no ancestors",
1459                    id,
1460                    info.generation
1461                );
1462            } else {
1463                assert!(
1464                    info.ancestors.is_empty(),
1465                    "Fresh segment {} has gen=0 but has ancestors",
1466                    id
1467                );
1468            }
1469        }
1470    }
1471
1472    /// Test that merging preserves every single document.
1473    /// Indexes 1000+ unique documents across many segments, force-merges,
1474    /// and verifies exact doc count and that every unique term is searchable.
1475    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1476    async fn test_merge_preserves_all_documents() {
1477        use crate::directories::MmapDirectory;
1478        let tmp_dir = tempfile::tempdir().unwrap();
1479        let dir = MmapDirectory::new(tmp_dir.path());
1480
1481        let mut schema_builder = SchemaBuilder::default();
1482        let title = schema_builder.add_text_field("title", true, true);
1483        let schema = schema_builder.build();
1484
1485        let config = IndexConfig {
1486            max_indexing_memory_bytes: 4096,
1487            ..Default::default()
1488        };
1489
1490        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1491            .await
1492            .unwrap();
1493
1494        let total_docs = 1200;
1495        let docs_per_batch = 60;
1496        let batches = total_docs / docs_per_batch;
1497
1498        // Each doc has a unique term "uid_N" for verification
1499        for batch in 0..batches {
1500            for i in 0..docs_per_batch {
1501                let doc_num = batch * docs_per_batch + i;
1502                let mut doc = Document::new();
1503                doc.add_text(
1504                    title,
1505                    format!("uid_{} common_term batch_{}", doc_num, batch),
1506                );
1507                writer.add_document(doc).unwrap();
1508            }
1509            writer.commit().await.unwrap();
1510        }
1511
1512        let pre_segments = writer.segment_manager.get_segment_ids().await.len();
1513        assert!(
1514            pre_segments >= 2,
1515            "Need multiple segments, got {}",
1516            pre_segments
1517        );
1518
1519        // Force merge to single segment
1520        writer.force_merge().await.unwrap();
1521
1522        let index = Index::open(dir, config).await.unwrap();
1523        assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1524        assert_eq!(
1525            index.num_docs().await.unwrap(),
1526            total_docs as u32,
1527            "Doc count mismatch after force_merge"
1528        );
1529
1530        // Verify every unique document is searchable
1531        let results = index.query("common_term", total_docs + 100).await.unwrap();
1532        assert_eq!(
1533            results.hits.len(),
1534            total_docs,
1535            "common_term should match all docs"
1536        );
1537
1538        // Spot-check unique IDs across the range
1539        for check in [0, 1, total_docs / 2, total_docs - 1] {
1540            let q = format!("uid_{}", check);
1541            let results = index.query(&q, 10).await.unwrap();
1542            assert_eq!(results.hits.len(), 1, "'{}' should match exactly 1 doc", q);
1543        }
1544    }
1545
1546    /// Multi-round commit+merge: verify doc count grows correctly
1547    /// and no documents are lost across multiple merge cycles.
1548    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1549    async fn test_multi_round_merge_doc_integrity() {
1550        use crate::directories::MmapDirectory;
1551        let tmp_dir = tempfile::tempdir().unwrap();
1552        let dir = MmapDirectory::new(tmp_dir.path());
1553
1554        let mut schema_builder = SchemaBuilder::default();
1555        let title = schema_builder.add_text_field("title", true, true);
1556        let schema = schema_builder.build();
1557
1558        let config = IndexConfig {
1559            max_indexing_memory_bytes: 4096,
1560            num_indexing_threads: 2,
1561            merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1562            ..Default::default()
1563        };
1564
1565        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1566            .await
1567            .unwrap();
1568
1569        let mut expected_total = 0u64;
1570
1571        // 4 rounds of: add docs → commit → wait for merges → verify count
1572        for round in 0..4 {
1573            let docs_this_round = 50 + round * 25; // 50, 75, 100, 125
1574            for batch in 0..5 {
1575                for i in 0..docs_this_round / 5 {
1576                    let mut doc = Document::new();
1577                    doc.add_text(
1578                        title,
1579                        format!("round_{}_batch_{}_doc_{} searchable", round, batch, i),
1580                    );
1581                    writer.add_document(doc).unwrap();
1582                }
1583                writer.commit().await.unwrap();
1584            }
1585            writer.wait_for_merging_thread().await;
1586
1587            expected_total += docs_this_round as u64;
1588
1589            let actual = writer
1590                .segment_manager
1591                .read_metadata(|m| {
1592                    m.segment_metas
1593                        .values()
1594                        .map(|s| s.num_docs as u64)
1595                        .sum::<u64>()
1596                })
1597                .await;
1598
1599            assert_eq!(
1600                actual, expected_total,
1601                "Round {}: expected {} docs, metadata reports {}",
1602                round, expected_total, actual
1603            );
1604        }
1605
1606        // Final verify: open fresh and query
1607        let index = Index::open(dir, config).await.unwrap();
1608        assert_eq!(index.num_docs().await.unwrap(), expected_total as u32);
1609
1610        let results = index
1611            .query("searchable", expected_total as usize + 100)
1612            .await
1613            .unwrap();
1614        assert_eq!(
1615            results.hits.len(),
1616            expected_total as usize,
1617            "All docs should match 'searchable'"
1618        );
1619
1620        // Check generation grew across rounds
1621        let metas = index
1622            .segment_manager()
1623            .read_metadata(|m| m.segment_metas.clone())
1624            .await;
1625        let max_gen = metas.values().map(|m| m.generation).max().unwrap_or(0);
1626        eprintln!(
1627            "Final: {} segments, {} docs, max generation={}",
1628            metas.len(),
1629            expected_total,
1630            max_gen
1631        );
1632        assert!(
1633            max_gen >= 1,
1634            "Multiple merge rounds should produce gen >= 1"
1635        );
1636    }
1637
1638    /// Sustained indexing: verify segment count stays O(logN) bounded.
1639    ///
1640    /// Indexes many small batches with aggressive merge policy and checks that
1641    /// the segment count never grows unbounded. With tiered merging the count
1642    /// should stay roughly O(segments_per_tier * num_tiers) ≈ O(log(N)).
1643    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1644    async fn test_segment_count_bounded_during_sustained_indexing() {
1645        use crate::directories::MmapDirectory;
1646        let tmp_dir = tempfile::tempdir().unwrap();
1647        let dir = MmapDirectory::new(tmp_dir.path());
1648
1649        let mut schema_builder = SchemaBuilder::default();
1650        let title = schema_builder.add_text_field("title", true, false);
1651        let schema = schema_builder.build();
1652
1653        let policy = crate::merge::TieredMergePolicy {
1654            segments_per_tier: 3,
1655            max_merge_at_once: 5,
1656            tier_factor: 10.0,
1657            tier_floor: 50,
1658            max_merged_docs: 1_000_000,
1659        };
1660
1661        let config = IndexConfig {
1662            max_indexing_memory_bytes: 4096, // tiny budget → frequent flushes
1663            num_indexing_threads: 1,
1664            merge_policy: Box::new(policy),
1665            max_concurrent_merges: 4,
1666            ..Default::default()
1667        };
1668
1669        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1670            .await
1671            .unwrap();
1672
1673        let num_commits = 40;
1674        let docs_per_commit = 30;
1675        let total_docs = num_commits * docs_per_commit;
1676        let mut max_segments_seen = 0usize;
1677
1678        for commit_idx in 0..num_commits {
1679            for i in 0..docs_per_commit {
1680                let mut doc = Document::new();
1681                doc.add_text(
1682                    title,
1683                    format!("doc_{} text", commit_idx * docs_per_commit + i),
1684                );
1685                writer.add_document(doc).unwrap();
1686            }
1687            writer.commit().await.unwrap();
1688
1689            // Give background merges a moment to run
1690            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1691
1692            let seg_count = writer.segment_manager.get_segment_ids().await.len();
1693            max_segments_seen = max_segments_seen.max(seg_count);
1694        }
1695
1696        // Wait for all merges to finish
1697        writer.wait_for_all_merges().await;
1698
1699        let final_segments = writer.segment_manager.get_segment_ids().await.len();
1700        let final_docs: u64 = writer
1701            .segment_manager
1702            .read_metadata(|m| {
1703                m.segment_metas
1704                    .values()
1705                    .map(|s| s.num_docs as u64)
1706                    .sum::<u64>()
1707            })
1708            .await;
1709
1710        eprintln!(
1711            "Sustained indexing: {} commits, {} total docs, final segments={}, max segments seen={}",
1712            num_commits, total_docs, final_segments, max_segments_seen
1713        );
1714
1715        // With 1200 docs and segments_per_tier=3, tier_floor=50:
1716        // tier 0: ≤50 docs, tier 1: 50-500, tier 2: 500-5000
1717        // We should have at most ~3 segments per tier * ~3 tiers ≈ 9-12 segments at peak.
1718        // The key invariant: segment count must NOT grow linearly with commits.
1719        // 40 commits should NOT produce 40 segments.
1720        let max_allowed = num_commits / 2; // generous: at most half the commits as segments
1721        assert!(
1722            max_segments_seen <= max_allowed,
1723            "Segment count grew too fast: max seen {} > allowed {} (out of {} commits). \
1724             Merging is not keeping up.",
1725            max_segments_seen,
1726            max_allowed,
1727            num_commits
1728        );
1729
1730        // After all merges complete, should be well under the limit
1731        assert!(
1732            final_segments <= 10,
1733            "After all merges, expected ≤10 segments, got {}",
1734            final_segments
1735        );
1736
1737        // No data loss
1738        assert_eq!(
1739            final_docs, total_docs as u64,
1740            "Expected {} docs, metadata reports {}",
1741            total_docs, final_docs
1742        );
1743    }
1744}