Skip to main content

hermes_core/index/
mod.rs

1//! Index - multi-segment async search index
2//!
3//! The `Index` is the central concept that provides:
4//! - `Index::create()` / `Index::open()` - create or open an index
5//! - `index.writer()` - get an IndexWriter for adding documents
6//! - `index.reader()` - get an IndexReader for searching (with reload policy)
7//!
8//! The Index owns the SegmentManager which handles segment lifecycle and tracking.
9
10#[cfg(feature = "native")]
11use crate::dsl::Schema;
12#[cfg(feature = "native")]
13use crate::error::Result;
14#[cfg(feature = "native")]
15use std::sync::Arc;
16
17mod searcher;
18pub use searcher::Searcher;
19
20#[cfg(feature = "native")]
21mod reader;
22#[cfg(feature = "native")]
23mod vector_builder;
24#[cfg(feature = "native")]
25mod writer;
26#[cfg(feature = "native")]
27pub use reader::IndexReader;
28#[cfg(feature = "native")]
29pub use writer::{IndexWriter, PreparedCommit};
30
31mod metadata;
32pub use metadata::{FieldVectorMeta, INDEX_META_FILENAME, IndexMetadata, VectorIndexState};
33
34#[cfg(feature = "native")]
35mod helpers;
36#[cfg(feature = "native")]
37pub use helpers::{
38    IndexingStats, SchemaConfig, SchemaFieldConfig, create_index_at_path, create_index_from_sdl,
39    index_documents_from_reader, index_json_document, parse_schema,
40};
41
42/// Default file name for the slice cache
43pub const SLICE_CACHE_FILENAME: &str = "index.slicecache";
44
45/// Index configuration
46#[derive(Debug, Clone)]
47pub struct IndexConfig {
48    /// Number of threads for CPU-intensive tasks (search parallelism)
49    pub num_threads: usize,
50    /// Number of parallel segment builders (documents distributed round-robin)
51    pub num_indexing_threads: usize,
52    /// Number of threads for parallel block compression within each segment
53    pub num_compression_threads: usize,
54    /// Block cache size for term dictionary per segment
55    pub term_cache_blocks: usize,
56    /// Block cache size for document store per segment
57    pub store_cache_blocks: usize,
58    /// Max memory (bytes) across all builders before auto-commit (global limit)
59    pub max_indexing_memory_bytes: usize,
60    /// Merge policy for background segment merging
61    pub merge_policy: Box<dyn crate::merge::MergePolicy>,
62    /// Index optimization mode (adaptive, size-optimized, performance-optimized)
63    pub optimization: crate::structures::IndexOptimization,
64    /// Reload interval in milliseconds for IndexReader (how often to check for new segments)
65    pub reload_interval_ms: u64,
66    /// Maximum number of concurrent background merges (default: 4)
67    pub max_concurrent_merges: usize,
68}
69
70impl Default for IndexConfig {
71    fn default() -> Self {
72        #[cfg(feature = "native")]
73        let indexing_threads = crate::default_indexing_threads();
74        #[cfg(not(feature = "native"))]
75        let indexing_threads = 1;
76
77        #[cfg(feature = "native")]
78        let compression_threads = crate::default_compression_threads();
79        #[cfg(not(feature = "native"))]
80        let compression_threads = 1;
81
82        Self {
83            num_threads: indexing_threads,
84            num_indexing_threads: 1, // Increase to 2+ for production to avoid stalls during segment build
85            num_compression_threads: compression_threads,
86            term_cache_blocks: 256,
87            store_cache_blocks: 32,
88            max_indexing_memory_bytes: 256 * 1024 * 1024, // 256 MB default
89            merge_policy: Box::new(crate::merge::TieredMergePolicy::default()),
90            optimization: crate::structures::IndexOptimization::default(),
91            reload_interval_ms: 1000, // 1 second default
92            max_concurrent_merges: 4,
93        }
94    }
95}
96
97/// Multi-segment async Index
98///
99/// The central concept for search. Owns segment lifecycle and provides:
100/// - `Index::create()` / `Index::open()` - create or open an index
101/// - `index.writer()` - get an IndexWriter for adding documents
102/// - `index.reader()` - get an IndexReader for searching with reload policy
103///
104/// All segment management is delegated to SegmentManager.
105#[cfg(feature = "native")]
106pub struct Index<D: crate::directories::DirectoryWriter + 'static> {
107    directory: Arc<D>,
108    schema: Arc<Schema>,
109    config: IndexConfig,
110    /// Segment manager - owns segments, tracker, metadata, and trained structures
111    segment_manager: Arc<crate::merge::SegmentManager<D>>,
112    /// Cached reader (created lazily, reused across calls)
113    cached_reader: tokio::sync::OnceCell<IndexReader<D>>,
114}
115
116#[cfg(feature = "native")]
117impl<D: crate::directories::DirectoryWriter + 'static> Index<D> {
118    /// Create a new index in the directory
119    pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
120        let directory = Arc::new(directory);
121        let schema = Arc::new(schema);
122        let metadata = IndexMetadata::new((*schema).clone());
123
124        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
125            Arc::clone(&directory),
126            Arc::clone(&schema),
127            metadata,
128            config.merge_policy.clone_box(),
129            config.term_cache_blocks,
130            config.max_concurrent_merges,
131        ));
132
133        // Save initial metadata
134        segment_manager.update_metadata(|_| {}).await?;
135
136        Ok(Self {
137            directory,
138            schema,
139            config,
140            segment_manager,
141            cached_reader: tokio::sync::OnceCell::new(),
142        })
143    }
144
145    /// Open an existing index from a directory
146    pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
147        let directory = Arc::new(directory);
148
149        // Load metadata (includes schema)
150        let metadata = IndexMetadata::load(directory.as_ref()).await?;
151        let schema = Arc::new(metadata.schema.clone());
152
153        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
154            Arc::clone(&directory),
155            Arc::clone(&schema),
156            metadata,
157            config.merge_policy.clone_box(),
158            config.term_cache_blocks,
159            config.max_concurrent_merges,
160        ));
161
162        // Load trained structures into SegmentManager's ArcSwap
163        segment_manager.load_and_publish_trained().await;
164
165        Ok(Self {
166            directory,
167            schema,
168            config,
169            segment_manager,
170            cached_reader: tokio::sync::OnceCell::new(),
171        })
172    }
173
174    /// Get the schema
175    pub fn schema(&self) -> &Schema {
176        &self.schema
177    }
178
179    /// Get the schema as an Arc reference (avoids clone when Arc is needed)
180    pub fn schema_arc(&self) -> &Arc<Schema> {
181        &self.schema
182    }
183
184    /// Get a reference to the underlying directory
185    pub fn directory(&self) -> &D {
186        &self.directory
187    }
188
189    /// Get the segment manager
190    pub fn segment_manager(&self) -> &Arc<crate::merge::SegmentManager<D>> {
191        &self.segment_manager
192    }
193
194    /// Get an IndexReader for searching (with reload policy)
195    ///
196    /// The reader is cached and reused across calls. The reader's internal
197    /// searcher will reload segments based on its reload interval (configurable via IndexConfig).
198    pub async fn reader(&self) -> Result<&IndexReader<D>> {
199        self.cached_reader
200            .get_or_try_init(|| async {
201                IndexReader::from_segment_manager(
202                    Arc::clone(&self.schema),
203                    Arc::clone(&self.segment_manager),
204                    self.config.term_cache_blocks,
205                    self.config.reload_interval_ms,
206                )
207                .await
208            })
209            .await
210    }
211
212    /// Get the config
213    pub fn config(&self) -> &IndexConfig {
214        &self.config
215    }
216
217    /// Get segment readers for query execution (convenience method)
218    pub async fn segment_readers(&self) -> Result<Vec<Arc<crate::segment::SegmentReader>>> {
219        let reader = self.reader().await?;
220        let searcher = reader.searcher().await?;
221        Ok(searcher.segment_readers().to_vec())
222    }
223
224    /// Total number of documents across all segments
225    pub async fn num_docs(&self) -> Result<u32> {
226        let reader = self.reader().await?;
227        let searcher = reader.searcher().await?;
228        Ok(searcher.num_docs())
229    }
230
231    /// Get default fields for search
232    pub fn default_fields(&self) -> Vec<crate::Field> {
233        if !self.schema.default_fields().is_empty() {
234            self.schema.default_fields().to_vec()
235        } else {
236            self.schema
237                .fields()
238                .filter(|(_, entry)| {
239                    entry.indexed && entry.field_type == crate::dsl::FieldType::Text
240                })
241                .map(|(field, _)| field)
242                .collect()
243        }
244    }
245
246    /// Get tokenizer registry
247    pub fn tokenizers(&self) -> Arc<crate::tokenizer::TokenizerRegistry> {
248        Arc::new(crate::tokenizer::TokenizerRegistry::default())
249    }
250
251    /// Create a query parser for this index
252    pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
253        let default_fields = self.default_fields();
254        let tokenizers = self.tokenizers();
255
256        let query_routers = self.schema.query_routers();
257        if !query_routers.is_empty()
258            && let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers)
259        {
260            return crate::dsl::QueryLanguageParser::with_router(
261                Arc::clone(&self.schema),
262                default_fields,
263                tokenizers,
264                router,
265            );
266        }
267
268        crate::dsl::QueryLanguageParser::new(Arc::clone(&self.schema), default_fields, tokenizers)
269    }
270
271    /// Parse and search using a query string
272    pub async fn query(
273        &self,
274        query_str: &str,
275        limit: usize,
276    ) -> Result<crate::query::SearchResponse> {
277        self.query_offset(query_str, limit, 0).await
278    }
279
280    /// Query with offset for pagination
281    pub async fn query_offset(
282        &self,
283        query_str: &str,
284        limit: usize,
285        offset: usize,
286    ) -> Result<crate::query::SearchResponse> {
287        let parser = self.query_parser();
288        let query = parser
289            .parse(query_str)
290            .map_err(crate::error::Error::Query)?;
291        self.search_offset(query.as_ref(), limit, offset).await
292    }
293
294    /// Search and return results
295    pub async fn search(
296        &self,
297        query: &dyn crate::query::Query,
298        limit: usize,
299    ) -> Result<crate::query::SearchResponse> {
300        self.search_offset(query, limit, 0).await
301    }
302
303    /// Search with offset for pagination
304    pub async fn search_offset(
305        &self,
306        query: &dyn crate::query::Query,
307        limit: usize,
308        offset: usize,
309    ) -> Result<crate::query::SearchResponse> {
310        let reader = self.reader().await?;
311        let searcher = reader.searcher().await?;
312        let segments = searcher.segment_readers();
313
314        let fetch_limit = offset + limit;
315
316        let futures: Vec<_> = segments
317            .iter()
318            .map(|segment| {
319                let sid = segment.meta().id;
320                async move {
321                    let results =
322                        crate::query::search_segment(segment.as_ref(), query, fetch_limit).await?;
323                    Ok::<_, crate::error::Error>(
324                        results
325                            .into_iter()
326                            .map(move |r| (sid, r))
327                            .collect::<Vec<_>>(),
328                    )
329                }
330            })
331            .collect();
332
333        let batches = futures::future::try_join_all(futures).await?;
334        let mut all_results: Vec<(u128, crate::query::SearchResult)> =
335            Vec::with_capacity(batches.iter().map(|b| b.len()).sum());
336        for batch in batches {
337            all_results.extend(batch);
338        }
339
340        all_results.sort_by(|a, b| {
341            b.1.score
342                .partial_cmp(&a.1.score)
343                .unwrap_or(std::cmp::Ordering::Equal)
344        });
345
346        let total_hits = all_results.len() as u32;
347
348        let hits: Vec<crate::query::SearchHit> = all_results
349            .into_iter()
350            .skip(offset)
351            .take(limit)
352            .map(|(segment_id, result)| crate::query::SearchHit {
353                address: crate::query::DocAddress::new(segment_id, result.doc_id),
354                score: result.score,
355                matched_fields: result.extract_ordinals(),
356            })
357            .collect();
358
359        Ok(crate::query::SearchResponse { hits, total_hits })
360    }
361
362    /// Get a document by its unique address
363    pub async fn get_document(
364        &self,
365        address: &crate::query::DocAddress,
366    ) -> Result<Option<crate::dsl::Document>> {
367        let reader = self.reader().await?;
368        let searcher = reader.searcher().await?;
369        searcher.get_document(address).await
370    }
371
372    /// Get posting lists for a term across all segments
373    pub async fn get_postings(
374        &self,
375        field: crate::Field,
376        term: &[u8],
377    ) -> Result<
378        Vec<(
379            Arc<crate::segment::SegmentReader>,
380            crate::structures::BlockPostingList,
381        )>,
382    > {
383        let segments = self.segment_readers().await?;
384        let mut results = Vec::new();
385
386        for segment in segments {
387            if let Some(postings) = segment.get_postings(field, term).await? {
388                results.push((segment, postings));
389            }
390        }
391
392        Ok(results)
393    }
394}
395
396/// Native-only methods for Index
397#[cfg(feature = "native")]
398impl<D: crate::directories::DirectoryWriter + 'static> Index<D> {
399    /// Get an IndexWriter for adding documents
400    pub fn writer(&self) -> writer::IndexWriter<D> {
401        writer::IndexWriter::from_index(self)
402    }
403}
404
405#[cfg(test)]
406mod tests {
407    use super::*;
408    use crate::directories::RamDirectory;
409    use crate::dsl::{Document, SchemaBuilder};
410
411    #[tokio::test]
412    async fn test_index_create_and_search() {
413        let mut schema_builder = SchemaBuilder::default();
414        let title = schema_builder.add_text_field("title", true, true);
415        let body = schema_builder.add_text_field("body", true, true);
416        let schema = schema_builder.build();
417
418        let dir = RamDirectory::new();
419        let config = IndexConfig::default();
420
421        // Create index and add documents
422        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
423            .await
424            .unwrap();
425
426        let mut doc1 = Document::new();
427        doc1.add_text(title, "Hello World");
428        doc1.add_text(body, "This is the first document");
429        writer.add_document(doc1).unwrap();
430
431        let mut doc2 = Document::new();
432        doc2.add_text(title, "Goodbye World");
433        doc2.add_text(body, "This is the second document");
434        writer.add_document(doc2).unwrap();
435
436        writer.commit().await.unwrap();
437
438        // Open for reading
439        let index = Index::open(dir, config).await.unwrap();
440        assert_eq!(index.num_docs().await.unwrap(), 2);
441
442        // Check postings
443        let postings = index.get_postings(title, b"world").await.unwrap();
444        assert_eq!(postings.len(), 1); // One segment
445        assert_eq!(postings[0].1.doc_count(), 2); // Two docs with "world"
446
447        // Retrieve document via searcher snapshot
448        let reader = index.reader().await.unwrap();
449        let searcher = reader.searcher().await.unwrap();
450        let doc = searcher.doc(0).await.unwrap().unwrap();
451        assert_eq!(doc.get_first(title).unwrap().as_text(), Some("Hello World"));
452    }
453
454    #[tokio::test]
455    async fn test_multiple_segments() {
456        let mut schema_builder = SchemaBuilder::default();
457        let title = schema_builder.add_text_field("title", true, true);
458        let schema = schema_builder.build();
459
460        let dir = RamDirectory::new();
461        let config = IndexConfig {
462            max_indexing_memory_bytes: 1024, // Very small to trigger frequent flushes
463            ..Default::default()
464        };
465
466        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
467            .await
468            .unwrap();
469
470        // Add documents in batches to create multiple segments
471        for batch in 0..3 {
472            for i in 0..5 {
473                let mut doc = Document::new();
474                doc.add_text(title, format!("Document {} batch {}", i, batch));
475                writer.add_document(doc).unwrap();
476            }
477            writer.commit().await.unwrap();
478        }
479
480        // Open and check
481        let index = Index::open(dir, config).await.unwrap();
482        assert_eq!(index.num_docs().await.unwrap(), 15);
483        // With queue-based indexing, exact segment count varies
484        assert!(
485            index.segment_readers().await.unwrap().len() >= 2,
486            "Expected multiple segments"
487        );
488    }
489
490    #[tokio::test]
491    async fn test_segment_merge() {
492        let mut schema_builder = SchemaBuilder::default();
493        let title = schema_builder.add_text_field("title", true, true);
494        let schema = schema_builder.build();
495
496        let dir = RamDirectory::new();
497        let config = IndexConfig {
498            max_indexing_memory_bytes: 512, // Very small to trigger frequent flushes
499            ..Default::default()
500        };
501
502        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
503            .await
504            .unwrap();
505
506        // Create multiple segments by flushing between batches
507        for batch in 0..3 {
508            for i in 0..3 {
509                let mut doc = Document::new();
510                doc.add_text(title, format!("Document {} batch {}", i, batch));
511                writer.add_document(doc).unwrap();
512            }
513            writer.commit().await.unwrap();
514        }
515
516        // Should have multiple segments (at least 2, one per flush with docs)
517        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
518        assert!(
519            index.segment_readers().await.unwrap().len() >= 2,
520            "Expected multiple segments"
521        );
522
523        // Force merge
524        let mut writer = IndexWriter::open(dir.clone(), config.clone())
525            .await
526            .unwrap();
527        writer.force_merge().await.unwrap();
528
529        // Should have 1 segment now
530        let index = Index::open(dir, config).await.unwrap();
531        assert_eq!(index.segment_readers().await.unwrap().len(), 1);
532        assert_eq!(index.num_docs().await.unwrap(), 9);
533
534        // Verify all documents accessible (order may vary with queue-based indexing)
535        let reader = index.reader().await.unwrap();
536        let searcher = reader.searcher().await.unwrap();
537        let mut found_docs = 0;
538        for i in 0..9 {
539            if searcher.doc(i).await.unwrap().is_some() {
540                found_docs += 1;
541            }
542        }
543        assert_eq!(found_docs, 9);
544    }
545
546    #[tokio::test]
547    async fn test_match_query() {
548        let mut schema_builder = SchemaBuilder::default();
549        let title = schema_builder.add_text_field("title", true, true);
550        let body = schema_builder.add_text_field("body", true, true);
551        let schema = schema_builder.build();
552
553        let dir = RamDirectory::new();
554        let config = IndexConfig::default();
555
556        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
557            .await
558            .unwrap();
559
560        let mut doc1 = Document::new();
561        doc1.add_text(title, "rust programming");
562        doc1.add_text(body, "Learn rust language");
563        writer.add_document(doc1).unwrap();
564
565        let mut doc2 = Document::new();
566        doc2.add_text(title, "python programming");
567        doc2.add_text(body, "Learn python language");
568        writer.add_document(doc2).unwrap();
569
570        writer.commit().await.unwrap();
571
572        let index = Index::open(dir, config).await.unwrap();
573
574        // Test match query with multiple default fields
575        let results = index.query("rust", 10).await.unwrap();
576        assert_eq!(results.hits.len(), 1);
577
578        // Test match query with multiple tokens
579        let results = index.query("rust programming", 10).await.unwrap();
580        assert!(!results.hits.is_empty());
581
582        // Verify hit has address (segment_id + doc_id)
583        let hit = &results.hits[0];
584        assert!(!hit.address.segment_id.is_empty(), "Should have segment_id");
585
586        // Verify document retrieval by address
587        let doc = index.get_document(&hit.address).await.unwrap().unwrap();
588        assert!(
589            !doc.field_values().is_empty(),
590            "Doc should have field values"
591        );
592
593        // Also verify doc retrieval via searcher snapshot
594        let reader = index.reader().await.unwrap();
595        let searcher = reader.searcher().await.unwrap();
596        let doc = searcher.doc(0).await.unwrap().unwrap();
597        assert!(
598            !doc.field_values().is_empty(),
599            "Doc should have field values"
600        );
601    }
602
603    #[tokio::test]
604    async fn test_slice_cache_warmup_and_load() {
605        use crate::directories::SliceCachingDirectory;
606
607        let mut schema_builder = SchemaBuilder::default();
608        let title = schema_builder.add_text_field("title", true, true);
609        let body = schema_builder.add_text_field("body", true, true);
610        let schema = schema_builder.build();
611
612        let dir = RamDirectory::new();
613        let config = IndexConfig::default();
614
615        // Create index with some documents
616        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
617            .await
618            .unwrap();
619
620        for i in 0..10 {
621            let mut doc = Document::new();
622            doc.add_text(title, format!("Document {} about rust", i));
623            doc.add_text(body, format!("This is body text number {}", i));
624            writer.add_document(doc).unwrap();
625        }
626        writer.commit().await.unwrap();
627
628        // Open with slice caching and perform some operations to warm up cache
629        let caching_dir = SliceCachingDirectory::new(dir.clone(), 1024 * 1024);
630        let index = Index::open(caching_dir, config.clone()).await.unwrap();
631
632        // Perform a search to warm up the cache
633        let results = index.query("rust", 10).await.unwrap();
634        assert!(!results.hits.is_empty());
635
636        // Check cache stats - should have cached some data
637        let stats = index.directory.stats();
638        assert!(stats.total_bytes > 0, "Cache should have data after search");
639    }
640
641    #[tokio::test]
642    async fn test_multivalue_field_indexing_and_search() {
643        let mut schema_builder = SchemaBuilder::default();
644        let uris = schema_builder.add_text_field("uris", true, true);
645        let title = schema_builder.add_text_field("title", true, true);
646        let schema = schema_builder.build();
647
648        let dir = RamDirectory::new();
649        let config = IndexConfig::default();
650
651        // Create index and add document with multi-value field
652        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
653            .await
654            .unwrap();
655
656        let mut doc = Document::new();
657        doc.add_text(uris, "one");
658        doc.add_text(uris, "two");
659        doc.add_text(title, "Test Document");
660        writer.add_document(doc).unwrap();
661
662        // Add another document with different uris
663        let mut doc2 = Document::new();
664        doc2.add_text(uris, "three");
665        doc2.add_text(title, "Another Document");
666        writer.add_document(doc2).unwrap();
667
668        writer.commit().await.unwrap();
669
670        // Open for reading
671        let index = Index::open(dir, config).await.unwrap();
672        assert_eq!(index.num_docs().await.unwrap(), 2);
673
674        // Verify document retrieval preserves all values
675        let reader = index.reader().await.unwrap();
676        let searcher = reader.searcher().await.unwrap();
677        let doc = searcher.doc(0).await.unwrap().unwrap();
678        let all_uris: Vec<_> = doc.get_all(uris).collect();
679        assert_eq!(all_uris.len(), 2, "Should have 2 uris values");
680        assert_eq!(all_uris[0].as_text(), Some("one"));
681        assert_eq!(all_uris[1].as_text(), Some("two"));
682
683        // Verify to_json returns array for multi-value field
684        let json = doc.to_json(index.schema());
685        let uris_json = json.get("uris").unwrap();
686        assert!(uris_json.is_array(), "Multi-value field should be an array");
687        let uris_arr = uris_json.as_array().unwrap();
688        assert_eq!(uris_arr.len(), 2);
689        assert_eq!(uris_arr[0].as_str(), Some("one"));
690        assert_eq!(uris_arr[1].as_str(), Some("two"));
691
692        // Verify both values are searchable
693        let results = index.query("uris:one", 10).await.unwrap();
694        assert_eq!(results.hits.len(), 1, "Should find doc with 'one'");
695        assert_eq!(results.hits[0].address.doc_id, 0);
696
697        let results = index.query("uris:two", 10).await.unwrap();
698        assert_eq!(results.hits.len(), 1, "Should find doc with 'two'");
699        assert_eq!(results.hits[0].address.doc_id, 0);
700
701        let results = index.query("uris:three", 10).await.unwrap();
702        assert_eq!(results.hits.len(), 1, "Should find doc with 'three'");
703        assert_eq!(results.hits[0].address.doc_id, 1);
704
705        // Verify searching for non-existent value returns no results
706        let results = index.query("uris:nonexistent", 10).await.unwrap();
707        assert_eq!(results.hits.len(), 0, "Should not find non-existent value");
708    }
709
710    /// Comprehensive test for MaxScore optimization in BooleanQuery OR queries
711    ///
712    /// This test verifies that:
713    /// 1. BooleanQuery with multiple SHOULD term queries uses MaxScore automatically
714    /// 2. Search results are correct regardless of MaxScore optimization
715    /// 3. Scores are reasonable for matching documents
716    #[tokio::test]
717    async fn test_maxscore_optimization_for_or_queries() {
718        use crate::query::{BooleanQuery, TermQuery};
719
720        let mut schema_builder = SchemaBuilder::default();
721        let content = schema_builder.add_text_field("content", true, true);
722        let schema = schema_builder.build();
723
724        let dir = RamDirectory::new();
725        let config = IndexConfig::default();
726
727        // Create index with documents containing various terms
728        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
729            .await
730            .unwrap();
731
732        // Doc 0: contains "rust" and "programming"
733        let mut doc = Document::new();
734        doc.add_text(content, "rust programming language is fast");
735        writer.add_document(doc).unwrap();
736
737        // Doc 1: contains "rust" only
738        let mut doc = Document::new();
739        doc.add_text(content, "rust is a systems language");
740        writer.add_document(doc).unwrap();
741
742        // Doc 2: contains "programming" only
743        let mut doc = Document::new();
744        doc.add_text(content, "programming is fun");
745        writer.add_document(doc).unwrap();
746
747        // Doc 3: contains "python" (neither rust nor programming)
748        let mut doc = Document::new();
749        doc.add_text(content, "python is easy to learn");
750        writer.add_document(doc).unwrap();
751
752        // Doc 4: contains both "rust" and "programming" multiple times
753        let mut doc = Document::new();
754        doc.add_text(content, "rust rust programming programming systems");
755        writer.add_document(doc).unwrap();
756
757        writer.commit().await.unwrap();
758
759        // Open for reading
760        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
761
762        // Test 1: Pure OR query with multiple terms (should use MaxScore automatically)
763        let or_query = BooleanQuery::new()
764            .should(TermQuery::text(content, "rust"))
765            .should(TermQuery::text(content, "programming"));
766
767        let results = index.search(&or_query, 10).await.unwrap();
768
769        // Should find docs 0, 1, 2, 4 (all that contain "rust" OR "programming")
770        assert_eq!(results.hits.len(), 4, "Should find exactly 4 documents");
771
772        let doc_ids: Vec<u32> = results.hits.iter().map(|h| h.address.doc_id).collect();
773        assert!(doc_ids.contains(&0), "Should find doc 0");
774        assert!(doc_ids.contains(&1), "Should find doc 1");
775        assert!(doc_ids.contains(&2), "Should find doc 2");
776        assert!(doc_ids.contains(&4), "Should find doc 4");
777        assert!(
778            !doc_ids.contains(&3),
779            "Should NOT find doc 3 (only has 'python')"
780        );
781
782        // Test 2: Single term query (should NOT use MaxScore, but still work)
783        let single_query = BooleanQuery::new().should(TermQuery::text(content, "rust"));
784
785        let results = index.search(&single_query, 10).await.unwrap();
786        assert_eq!(results.hits.len(), 3, "Should find 3 documents with 'rust'");
787
788        // Test 3: Query with MUST (should NOT use MaxScore)
789        let must_query = BooleanQuery::new()
790            .must(TermQuery::text(content, "rust"))
791            .should(TermQuery::text(content, "programming"));
792
793        let results = index.search(&must_query, 10).await.unwrap();
794        // Must have "rust", optionally "programming"
795        assert_eq!(results.hits.len(), 3, "Should find 3 documents with 'rust'");
796
797        // Test 4: Query with MUST_NOT (should NOT use MaxScore)
798        let must_not_query = BooleanQuery::new()
799            .should(TermQuery::text(content, "rust"))
800            .should(TermQuery::text(content, "programming"))
801            .must_not(TermQuery::text(content, "systems"));
802
803        let results = index.search(&must_not_query, 10).await.unwrap();
804        // Should exclude docs with "systems" (doc 1 and 4)
805        let doc_ids: Vec<u32> = results.hits.iter().map(|h| h.address.doc_id).collect();
806        assert!(
807            !doc_ids.contains(&1),
808            "Should NOT find doc 1 (has 'systems')"
809        );
810        assert!(
811            !doc_ids.contains(&4),
812            "Should NOT find doc 4 (has 'systems')"
813        );
814
815        // Test 5: Verify top-k limit works correctly with MaxScore
816        let or_query = BooleanQuery::new()
817            .should(TermQuery::text(content, "rust"))
818            .should(TermQuery::text(content, "programming"));
819
820        let results = index.search(&or_query, 2).await.unwrap();
821        assert_eq!(results.hits.len(), 2, "Should return only top 2 results");
822
823        // Top results should be docs that match both terms (higher scores)
824        // Doc 0 and 4 contain both "rust" and "programming"
825    }
826
827    /// Test that BooleanQuery with pure SHOULD clauses uses MaxScore and returns correct results
828    #[tokio::test]
829    async fn test_boolean_or_maxscore_optimization() {
830        use crate::query::{BooleanQuery, TermQuery};
831
832        let mut schema_builder = SchemaBuilder::default();
833        let content = schema_builder.add_text_field("content", true, true);
834        let schema = schema_builder.build();
835
836        let dir = RamDirectory::new();
837        let config = IndexConfig::default();
838
839        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
840            .await
841            .unwrap();
842
843        // Add several documents
844        for i in 0..10 {
845            let mut doc = Document::new();
846            let text = match i % 4 {
847                0 => "apple banana cherry",
848                1 => "apple orange",
849                2 => "banana grape",
850                _ => "cherry date",
851            };
852            doc.add_text(content, text);
853            writer.add_document(doc).unwrap();
854        }
855
856        writer.commit().await.unwrap();
857        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
858
859        // Pure SHOULD query — triggers MaxScore fast path
860        let query = BooleanQuery::new()
861            .should(TermQuery::text(content, "apple"))
862            .should(TermQuery::text(content, "banana"));
863
864        let results = index.search(&query, 10).await.unwrap();
865
866        // "apple" matches docs 0,1,4,5,8,9 and "banana" matches docs 0,2,4,6,8
867        // Union = {0,1,2,4,5,6,8,9} = 8 docs
868        assert_eq!(results.hits.len(), 8, "Should find all matching docs");
869    }
870
871    #[tokio::test]
872    async fn test_vector_index_threshold_switch() {
873        use crate::dsl::{DenseVectorConfig, DenseVectorQuantization, VectorIndexType};
874
875        // Create schema with dense vector field configured for IVF-RaBitQ
876        let mut schema_builder = SchemaBuilder::default();
877        let title = schema_builder.add_text_field("title", true, true);
878        let embedding = schema_builder.add_dense_vector_field_with_config(
879            "embedding",
880            true, // indexed
881            true, // stored
882            DenseVectorConfig {
883                dim: 8,
884                index_type: VectorIndexType::IvfRaBitQ,
885                quantization: DenseVectorQuantization::F32,
886                num_clusters: Some(4), // Small for test
887                nprobe: 2,
888                build_threshold: Some(50), // Build when we have 50+ vectors
889                unit_norm: false,
890            },
891        );
892        let schema = schema_builder.build();
893
894        let dir = RamDirectory::new();
895        let config = IndexConfig::default();
896
897        // Phase 1: Add vectors below threshold (should use Flat index)
898        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
899            .await
900            .unwrap();
901
902        // Add 30 documents (below threshold of 50)
903        for i in 0..30 {
904            let mut doc = Document::new();
905            doc.add_text(title, format!("Document {}", i));
906            // Simple embedding: [i, i, i, i, i, i, i, i] normalized
907            let vec: Vec<f32> = (0..8).map(|_| (i as f32) / 30.0).collect();
908            doc.add_dense_vector(embedding, vec);
909            writer.add_document(doc).unwrap();
910        }
911        writer.commit().await.unwrap();
912
913        // Open index and verify it's using Flat (not built yet)
914        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
915        assert!(
916            index.segment_manager.trained().is_none(),
917            "Should not have trained centroids below threshold"
918        );
919
920        // Search should work with Flat index
921        let query_vec: Vec<f32> = vec![0.5; 8];
922        let segments = index.segment_readers().await.unwrap();
923        assert!(!segments.is_empty());
924
925        let results = segments[0]
926            .search_dense_vector(
927                embedding,
928                &query_vec,
929                5,
930                0,
931                1,
932                crate::query::MultiValueCombiner::Max,
933            )
934            .await
935            .unwrap();
936        assert!(!results.is_empty(), "Flat search should return results");
937
938        // Phase 2: Add more vectors to cross threshold
939        let mut writer = IndexWriter::open(dir.clone(), config.clone())
940            .await
941            .unwrap();
942
943        // Add 30 more documents (total 60, above threshold of 50)
944        for i in 30..60 {
945            let mut doc = Document::new();
946            doc.add_text(title, format!("Document {}", i));
947            let vec: Vec<f32> = (0..8).map(|_| (i as f32) / 60.0).collect();
948            doc.add_dense_vector(embedding, vec);
949            writer.add_document(doc).unwrap();
950        }
951        writer.commit().await.unwrap();
952
953        // Manually trigger vector index build (no longer auto-triggered by commit)
954        writer.build_vector_index().await.unwrap();
955
956        // Reopen index and verify trained structures are loaded
957        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
958        assert!(
959            index.segment_manager.trained().is_some(),
960            "Should have loaded trained centroids for embedding field"
961        );
962
963        // Search should still work
964        let segments = index.segment_readers().await.unwrap();
965        let results = segments[0]
966            .search_dense_vector(
967                embedding,
968                &query_vec,
969                5,
970                0,
971                1,
972                crate::query::MultiValueCombiner::Max,
973            )
974            .await
975            .unwrap();
976        assert!(
977            !results.is_empty(),
978            "Search should return results after build"
979        );
980
981        // Phase 3: Verify calling build_vector_index again is a no-op
982        let writer = IndexWriter::open(dir.clone(), config.clone())
983            .await
984            .unwrap();
985        writer.build_vector_index().await.unwrap(); // Should skip training
986
987        // Still built (trained structures present in ArcSwap)
988        assert!(writer.segment_manager.trained().is_some());
989    }
990
991    /// Multi-round merge: flush many small segments, merge, add more, merge again.
992    /// Verifies search correctness (term + phrase queries) through multiple merge rounds.
993    #[tokio::test]
994    async fn test_multi_round_merge_with_search() {
995        let mut schema_builder = SchemaBuilder::default();
996        let title = schema_builder.add_text_field("title", true, true);
997        let body = schema_builder.add_text_field("body", true, true);
998        let schema = schema_builder.build();
999
1000        let dir = RamDirectory::new();
1001        let config = IndexConfig {
1002            max_indexing_memory_bytes: 512,
1003            ..Default::default()
1004        };
1005
1006        // --- Round 1: 5 segments × 10 docs = 50 docs ---
1007        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1008            .await
1009            .unwrap();
1010
1011        for batch in 0..5 {
1012            for i in 0..10 {
1013                let mut doc = Document::new();
1014                doc.add_text(
1015                    title,
1016                    format!("alpha bravo charlie batch{} doc{}", batch, i),
1017                );
1018                doc.add_text(
1019                    body,
1020                    format!("the quick brown fox jumps over the lazy dog number {}", i),
1021                );
1022                writer.add_document(doc).unwrap();
1023            }
1024            writer.commit().await.unwrap();
1025        }
1026
1027        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1028        let pre_merge_segments = index.segment_readers().await.unwrap().len();
1029        assert!(
1030            pre_merge_segments >= 3,
1031            "Expected >=3 segments, got {}",
1032            pre_merge_segments
1033        );
1034        assert_eq!(index.num_docs().await.unwrap(), 50);
1035
1036        // Search before merge
1037        let results = index.query("alpha", 100).await.unwrap();
1038        assert_eq!(results.hits.len(), 50, "all 50 docs should match 'alpha'");
1039
1040        let results = index.query("fox", 100).await.unwrap();
1041        assert_eq!(results.hits.len(), 50, "all 50 docs should match 'fox'");
1042
1043        // --- Merge round 1 ---
1044        let mut writer = IndexWriter::open(dir.clone(), config.clone())
1045            .await
1046            .unwrap();
1047        writer.force_merge().await.unwrap();
1048
1049        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1050        assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1051        assert_eq!(index.num_docs().await.unwrap(), 50);
1052
1053        // Search after first merge
1054        let results = index.query("alpha", 100).await.unwrap();
1055        assert_eq!(
1056            results.hits.len(),
1057            50,
1058            "all 50 docs should match 'alpha' after merge 1"
1059        );
1060
1061        let results = index.query("fox", 100).await.unwrap();
1062        assert_eq!(
1063            results.hits.len(),
1064            50,
1065            "all 50 docs should match 'fox' after merge 1"
1066        );
1067
1068        // Verify all docs retrievable
1069        let reader1 = index.reader().await.unwrap();
1070        let searcher1 = reader1.searcher().await.unwrap();
1071        for i in 0..50 {
1072            let doc = searcher1.doc(i).await.unwrap();
1073            assert!(doc.is_some(), "doc {} should exist after merge 1", i);
1074        }
1075
1076        // --- Round 2: add 30 more docs in 3 segments ---
1077        let mut writer = IndexWriter::open(dir.clone(), config.clone())
1078            .await
1079            .unwrap();
1080        for batch in 0..3 {
1081            for i in 0..10 {
1082                let mut doc = Document::new();
1083                doc.add_text(
1084                    title,
1085                    format!("delta echo foxtrot round2_batch{} doc{}", batch, i),
1086                );
1087                doc.add_text(
1088                    body,
1089                    format!("the quick brown fox jumps again number {}", i),
1090                );
1091                writer.add_document(doc).unwrap();
1092            }
1093            writer.commit().await.unwrap();
1094        }
1095
1096        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1097        assert_eq!(index.num_docs().await.unwrap(), 80);
1098        assert!(
1099            index.segment_readers().await.unwrap().len() >= 2,
1100            "Should have >=2 segments after round 2 ingestion"
1101        );
1102
1103        // Search spans both old merged segment and new segments
1104        let results = index.query("fox", 100).await.unwrap();
1105        assert_eq!(results.hits.len(), 80, "all 80 docs should match 'fox'");
1106
1107        let results = index.query("alpha", 100).await.unwrap();
1108        assert_eq!(results.hits.len(), 50, "only round 1 docs match 'alpha'");
1109
1110        let results = index.query("delta", 100).await.unwrap();
1111        assert_eq!(results.hits.len(), 30, "only round 2 docs match 'delta'");
1112
1113        // --- Merge round 2 ---
1114        let mut writer = IndexWriter::open(dir.clone(), config.clone())
1115            .await
1116            .unwrap();
1117        writer.force_merge().await.unwrap();
1118
1119        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1120        assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1121        assert_eq!(index.num_docs().await.unwrap(), 80);
1122
1123        // All searches still correct after second merge
1124        let results = index.query("fox", 100).await.unwrap();
1125        assert_eq!(results.hits.len(), 80, "all 80 docs after merge 2");
1126
1127        let results = index.query("alpha", 100).await.unwrap();
1128        assert_eq!(results.hits.len(), 50, "round 1 docs after merge 2");
1129
1130        let results = index.query("delta", 100).await.unwrap();
1131        assert_eq!(results.hits.len(), 30, "round 2 docs after merge 2");
1132
1133        // Verify all 80 docs retrievable
1134        let reader2 = index.reader().await.unwrap();
1135        let searcher2 = reader2.searcher().await.unwrap();
1136        for i in 0..80 {
1137            let doc = searcher2.doc(i).await.unwrap();
1138            assert!(doc.is_some(), "doc {} should exist after merge 2", i);
1139        }
1140    }
1141
1142    /// Large-scale merge: many segments with overlapping terms, verifying
1143    /// BM25 scoring and doc retrieval after merge.
1144    #[tokio::test]
1145    async fn test_large_scale_merge_correctness() {
1146        let mut schema_builder = SchemaBuilder::default();
1147        let title = schema_builder.add_text_field("title", true, true);
1148        let schema = schema_builder.build();
1149
1150        let dir = RamDirectory::new();
1151        let config = IndexConfig {
1152            max_indexing_memory_bytes: 512,
1153            ..Default::default()
1154        };
1155
1156        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1157            .await
1158            .unwrap();
1159
1160        // 8 batches × 25 docs = 200 docs total
1161        // Terms: "common" appears in all, "unique_N" appears in batch N only
1162        let total_docs = 200u32;
1163        for batch in 0..8 {
1164            for i in 0..25 {
1165                let mut doc = Document::new();
1166                doc.add_text(
1167                    title,
1168                    format!("common shared term unique_{} item{}", batch, i),
1169                );
1170                writer.add_document(doc).unwrap();
1171            }
1172            writer.commit().await.unwrap();
1173        }
1174
1175        // Verify pre-merge
1176        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1177        assert_eq!(index.num_docs().await.unwrap(), total_docs);
1178
1179        let results = index.query("common", 300).await.unwrap();
1180        assert_eq!(
1181            results.hits.len(),
1182            total_docs as usize,
1183            "all docs should match 'common'"
1184        );
1185
1186        // Each unique_N matches exactly 25 docs
1187        for batch in 0..8 {
1188            let q = format!("unique_{}", batch);
1189            let results = index.query(&q, 100).await.unwrap();
1190            assert_eq!(results.hits.len(), 25, "'{}' should match 25 docs", q);
1191        }
1192
1193        // Force merge
1194        let mut writer = IndexWriter::open(dir.clone(), config.clone())
1195            .await
1196            .unwrap();
1197        writer.force_merge().await.unwrap();
1198
1199        // Verify post-merge: single segment, same results
1200        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1201        assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1202        assert_eq!(index.num_docs().await.unwrap(), total_docs);
1203
1204        let results = index.query("common", 300).await.unwrap();
1205        assert_eq!(results.hits.len(), total_docs as usize);
1206
1207        for batch in 0..8 {
1208            let q = format!("unique_{}", batch);
1209            let results = index.query(&q, 100).await.unwrap();
1210            assert_eq!(results.hits.len(), 25, "'{}' after merge", q);
1211        }
1212
1213        // Verify doc retrieval for every doc
1214        let reader = index.reader().await.unwrap();
1215        let searcher = reader.searcher().await.unwrap();
1216        for i in 0..total_docs {
1217            let doc = searcher.doc(i).await.unwrap();
1218            assert!(doc.is_some(), "doc {} missing after merge", i);
1219        }
1220    }
1221
1222    /// Test that auto-merge is triggered by the merge policy during commit,
1223    /// without calling force_merge. Uses MmapDirectory and higher parallelism
1224    /// to reproduce production conditions.
1225    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
1226    async fn test_auto_merge_triggered() {
1227        use crate::directories::MmapDirectory;
1228        let tmp_dir = tempfile::tempdir().unwrap();
1229        let dir = MmapDirectory::new(tmp_dir.path());
1230
1231        let mut schema_builder = SchemaBuilder::default();
1232        let title = schema_builder.add_text_field("title", true, true);
1233        let body = schema_builder.add_text_field("body", true, true);
1234        let schema = schema_builder.build();
1235
1236        // Aggressive policy: merge when 3 segments in same tier
1237        let config = IndexConfig {
1238            max_indexing_memory_bytes: 4096,
1239            num_indexing_threads: 4,
1240            merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1241            ..Default::default()
1242        };
1243
1244        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1245            .await
1246            .unwrap();
1247
1248        // Create 12 segments with ~50 docs each (4x the aggressive threshold of 3)
1249        for batch in 0..12 {
1250            for i in 0..50 {
1251                let mut doc = Document::new();
1252                doc.add_text(title, format!("document_{} batch_{} alpha bravo", i, batch));
1253                doc.add_text(
1254                    body,
1255                    format!(
1256                        "the quick brown fox jumps over lazy dog number {} round {}",
1257                        i, batch
1258                    ),
1259                );
1260                writer.add_document(doc).unwrap();
1261            }
1262            writer.commit().await.unwrap();
1263        }
1264
1265        let pre_merge = writer.segment_manager.get_segment_ids().await.len();
1266
1267        // wait_for_merging_thread waits for the single in-flight merge. After it completes,
1268        // re-evaluate since segments accumulated while the merge was running.
1269        writer.wait_for_merging_thread().await;
1270        writer.maybe_merge().await;
1271        writer.wait_for_merging_thread().await;
1272
1273        // After commit + auto-merge, segment count should be reduced
1274        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1275        let segment_count = index.segment_readers().await.unwrap().len();
1276        eprintln!(
1277            "Segments: {} before merge, {} after auto-merge",
1278            pre_merge, segment_count
1279        );
1280        assert!(
1281            segment_count < pre_merge,
1282            "Expected auto-merge to reduce segments from {}, got {}",
1283            pre_merge,
1284            segment_count
1285        );
1286    }
1287
1288    /// Regression test: commit with dense vector fields + aggressive merge policy.
1289    /// Exercises the race where background merge deletes segment files while
1290    /// maybe_build_vector_index → collect_vectors_for_training tries to open them.
1291    /// Before the fix, this would fail with "IO error: No such file or directory".
1292    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
1293    async fn test_commit_with_vectors_and_background_merge() {
1294        use crate::directories::MmapDirectory;
1295        use crate::dsl::DenseVectorConfig;
1296
1297        let tmp_dir = tempfile::tempdir().unwrap();
1298        let dir = MmapDirectory::new(tmp_dir.path());
1299
1300        let mut schema_builder = SchemaBuilder::default();
1301        let title = schema_builder.add_text_field("title", true, true);
1302        // RaBitQ with very low build_threshold so vector index building triggers during commit
1303        let vec_config = DenseVectorConfig::new(8).with_build_threshold(10);
1304        let embedding =
1305            schema_builder.add_dense_vector_field_with_config("embedding", true, true, vec_config);
1306        let schema = schema_builder.build();
1307
1308        // Aggressive merge: triggers background merges at 3 segments per tier
1309        let config = IndexConfig {
1310            max_indexing_memory_bytes: 4096,
1311            num_indexing_threads: 4,
1312            merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1313            ..Default::default()
1314        };
1315
1316        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1317            .await
1318            .unwrap();
1319
1320        // Create 12 segments with vectors — enough to trigger both
1321        // background merges (aggressive policy) and vector index building (threshold=10)
1322        for batch in 0..12 {
1323            for i in 0..5 {
1324                let mut doc = Document::new();
1325                doc.add_text(title, format!("doc_{}_batch_{}", i, batch));
1326                // 8-dim random vector
1327                let vec: Vec<f32> = (0..8).map(|j| (i * 8 + j + batch) as f32 * 0.1).collect();
1328                doc.add_dense_vector(embedding, vec);
1329                writer.add_document(doc).unwrap();
1330            }
1331            writer.commit().await.unwrap();
1332        }
1333        writer.wait_for_merging_thread().await;
1334
1335        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1336        let num_docs = index.num_docs().await.unwrap();
1337        assert_eq!(num_docs, 60, "Expected 60 docs, got {}", num_docs);
1338    }
1339
1340    /// Stress test: force_merge with many segments (iterative batching).
1341    /// Verifies that merging 50 segments doesn't OOM or exhaust file descriptors.
1342    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1343    async fn test_force_merge_many_segments() {
1344        use crate::directories::MmapDirectory;
1345        let tmp_dir = tempfile::tempdir().unwrap();
1346        let dir = MmapDirectory::new(tmp_dir.path());
1347
1348        let mut schema_builder = SchemaBuilder::default();
1349        let title = schema_builder.add_text_field("title", true, true);
1350        let schema = schema_builder.build();
1351
1352        let config = IndexConfig {
1353            max_indexing_memory_bytes: 512,
1354            ..Default::default()
1355        };
1356
1357        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1358            .await
1359            .unwrap();
1360
1361        // Create 50 tiny segments
1362        for batch in 0..50 {
1363            for i in 0..3 {
1364                let mut doc = Document::new();
1365                doc.add_text(title, format!("term_{} batch_{}", i, batch));
1366                writer.add_document(doc).unwrap();
1367            }
1368            writer.commit().await.unwrap();
1369        }
1370        // Wait for background merges before reading segment count
1371        writer.wait_for_merging_thread().await;
1372
1373        let seg_ids = writer.segment_manager.get_segment_ids().await;
1374        let pre = seg_ids.len();
1375        eprintln!("Segments before force_merge: {}", pre);
1376        assert!(pre >= 2, "Expected multiple segments, got {}", pre);
1377
1378        // Force merge all into one — should iterate in batches, not OOM
1379        writer.force_merge().await.unwrap();
1380
1381        let index2 = Index::open(dir, config).await.unwrap();
1382        let post = index2.segment_readers().await.unwrap().len();
1383        eprintln!("Segments after force_merge: {}", post);
1384        assert_eq!(post, 1);
1385        assert_eq!(index2.num_docs().await.unwrap(), 150);
1386    }
1387
1388    /// Test that background merges produce correct generation metadata.
1389    /// Creates many segments with aggressive policy, commits, waits for merges,
1390    /// and verifies that merged segments have generation >= 1 with correct ancestors.
1391    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1392    async fn test_background_merge_generation() {
1393        use crate::directories::MmapDirectory;
1394        let tmp_dir = tempfile::tempdir().unwrap();
1395        let dir = MmapDirectory::new(tmp_dir.path());
1396
1397        let mut schema_builder = SchemaBuilder::default();
1398        let title = schema_builder.add_text_field("title", true, true);
1399        let schema = schema_builder.build();
1400
1401        let config = IndexConfig {
1402            max_indexing_memory_bytes: 4096,
1403            num_indexing_threads: 2,
1404            merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1405            ..Default::default()
1406        };
1407
1408        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1409            .await
1410            .unwrap();
1411
1412        // Create 15 small segments — enough for aggressive policy to trigger merges
1413        for batch in 0..15 {
1414            for i in 0..5 {
1415                let mut doc = Document::new();
1416                doc.add_text(title, format!("doc_{}_batch_{}", i, batch));
1417                writer.add_document(doc).unwrap();
1418            }
1419            writer.commit().await.unwrap();
1420        }
1421        writer.wait_for_merging_thread().await;
1422
1423        // Read metadata and verify generation tracking
1424        let metas = writer
1425            .segment_manager
1426            .read_metadata(|m| m.segment_metas.clone())
1427            .await;
1428
1429        let max_gen = metas.values().map(|m| m.generation).max().unwrap_or(0);
1430        eprintln!(
1431            "Segments after merge: {}, max generation: {}",
1432            metas.len(),
1433            max_gen
1434        );
1435
1436        // Background merges should have produced at least one merged segment (gen >= 1)
1437        assert!(
1438            max_gen >= 1,
1439            "Expected at least one merged segment (gen >= 1), got max_gen={}",
1440            max_gen
1441        );
1442
1443        // Every merged segment (gen > 0) must have non-empty ancestors
1444        for (id, info) in &metas {
1445            if info.generation > 0 {
1446                assert!(
1447                    !info.ancestors.is_empty(),
1448                    "Segment {} has gen={} but no ancestors",
1449                    id,
1450                    info.generation
1451                );
1452            } else {
1453                assert!(
1454                    info.ancestors.is_empty(),
1455                    "Fresh segment {} has gen=0 but has ancestors",
1456                    id
1457                );
1458            }
1459        }
1460    }
1461
1462    /// Test that merging preserves every single document.
1463    /// Indexes 1000+ unique documents across many segments, force-merges,
1464    /// and verifies exact doc count and that every unique term is searchable.
1465    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1466    async fn test_merge_preserves_all_documents() {
1467        use crate::directories::MmapDirectory;
1468        let tmp_dir = tempfile::tempdir().unwrap();
1469        let dir = MmapDirectory::new(tmp_dir.path());
1470
1471        let mut schema_builder = SchemaBuilder::default();
1472        let title = schema_builder.add_text_field("title", true, true);
1473        let schema = schema_builder.build();
1474
1475        let config = IndexConfig {
1476            max_indexing_memory_bytes: 4096,
1477            ..Default::default()
1478        };
1479
1480        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1481            .await
1482            .unwrap();
1483
1484        let total_docs = 1200;
1485        let docs_per_batch = 60;
1486        let batches = total_docs / docs_per_batch;
1487
1488        // Each doc has a unique term "uid_N" for verification
1489        for batch in 0..batches {
1490            for i in 0..docs_per_batch {
1491                let doc_num = batch * docs_per_batch + i;
1492                let mut doc = Document::new();
1493                doc.add_text(
1494                    title,
1495                    format!("uid_{} common_term batch_{}", doc_num, batch),
1496                );
1497                writer.add_document(doc).unwrap();
1498            }
1499            writer.commit().await.unwrap();
1500        }
1501
1502        let pre_segments = writer.segment_manager.get_segment_ids().await.len();
1503        assert!(
1504            pre_segments >= 2,
1505            "Need multiple segments, got {}",
1506            pre_segments
1507        );
1508
1509        // Force merge to single segment
1510        writer.force_merge().await.unwrap();
1511
1512        let index = Index::open(dir, config).await.unwrap();
1513        assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1514        assert_eq!(
1515            index.num_docs().await.unwrap(),
1516            total_docs as u32,
1517            "Doc count mismatch after force_merge"
1518        );
1519
1520        // Verify every unique document is searchable
1521        let results = index.query("common_term", total_docs + 100).await.unwrap();
1522        assert_eq!(
1523            results.hits.len(),
1524            total_docs,
1525            "common_term should match all docs"
1526        );
1527
1528        // Spot-check unique IDs across the range
1529        for check in [0, 1, total_docs / 2, total_docs - 1] {
1530            let q = format!("uid_{}", check);
1531            let results = index.query(&q, 10).await.unwrap();
1532            assert_eq!(results.hits.len(), 1, "'{}' should match exactly 1 doc", q);
1533        }
1534    }
1535
1536    /// Multi-round commit+merge: verify doc count grows correctly
1537    /// and no documents are lost across multiple merge cycles.
1538    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1539    async fn test_multi_round_merge_doc_integrity() {
1540        use crate::directories::MmapDirectory;
1541        let tmp_dir = tempfile::tempdir().unwrap();
1542        let dir = MmapDirectory::new(tmp_dir.path());
1543
1544        let mut schema_builder = SchemaBuilder::default();
1545        let title = schema_builder.add_text_field("title", true, true);
1546        let schema = schema_builder.build();
1547
1548        let config = IndexConfig {
1549            max_indexing_memory_bytes: 4096,
1550            num_indexing_threads: 2,
1551            merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1552            ..Default::default()
1553        };
1554
1555        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1556            .await
1557            .unwrap();
1558
1559        let mut expected_total = 0u64;
1560
1561        // 4 rounds of: add docs → commit → wait for merges → verify count
1562        for round in 0..4 {
1563            let docs_this_round = 50 + round * 25; // 50, 75, 100, 125
1564            for batch in 0..5 {
1565                for i in 0..docs_this_round / 5 {
1566                    let mut doc = Document::new();
1567                    doc.add_text(
1568                        title,
1569                        format!("round_{}_batch_{}_doc_{} searchable", round, batch, i),
1570                    );
1571                    writer.add_document(doc).unwrap();
1572                }
1573                writer.commit().await.unwrap();
1574            }
1575            writer.wait_for_merging_thread().await;
1576
1577            expected_total += docs_this_round as u64;
1578
1579            let actual = writer
1580                .segment_manager
1581                .read_metadata(|m| {
1582                    m.segment_metas
1583                        .values()
1584                        .map(|s| s.num_docs as u64)
1585                        .sum::<u64>()
1586                })
1587                .await;
1588
1589            assert_eq!(
1590                actual, expected_total,
1591                "Round {}: expected {} docs, metadata reports {}",
1592                round, expected_total, actual
1593            );
1594        }
1595
1596        // Final verify: open fresh and query
1597        let index = Index::open(dir, config).await.unwrap();
1598        assert_eq!(index.num_docs().await.unwrap(), expected_total as u32);
1599
1600        let results = index
1601            .query("searchable", expected_total as usize + 100)
1602            .await
1603            .unwrap();
1604        assert_eq!(
1605            results.hits.len(),
1606            expected_total as usize,
1607            "All docs should match 'searchable'"
1608        );
1609
1610        // Check generation grew across rounds
1611        let metas = index
1612            .segment_manager()
1613            .read_metadata(|m| m.segment_metas.clone())
1614            .await;
1615        let max_gen = metas.values().map(|m| m.generation).max().unwrap_or(0);
1616        eprintln!(
1617            "Final: {} segments, {} docs, max generation={}",
1618            metas.len(),
1619            expected_total,
1620            max_gen
1621        );
1622        assert!(
1623            max_gen >= 1,
1624            "Multiple merge rounds should produce gen >= 1"
1625        );
1626    }
1627
1628    /// Sustained indexing: verify segment count stays O(logN) bounded.
1629    ///
1630    /// Indexes many small batches with aggressive merge policy and checks that
1631    /// the segment count never grows unbounded. With tiered merging the count
1632    /// should stay roughly O(segments_per_tier * num_tiers) ≈ O(log(N)).
1633    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1634    async fn test_segment_count_bounded_during_sustained_indexing() {
1635        use crate::directories::MmapDirectory;
1636        let tmp_dir = tempfile::tempdir().unwrap();
1637        let dir = MmapDirectory::new(tmp_dir.path());
1638
1639        let mut schema_builder = SchemaBuilder::default();
1640        let title = schema_builder.add_text_field("title", true, false);
1641        let schema = schema_builder.build();
1642
1643        let policy = crate::merge::TieredMergePolicy {
1644            segments_per_tier: 3,
1645            max_merge_at_once: 5,
1646            tier_factor: 10.0,
1647            tier_floor: 50,
1648            max_merged_docs: 1_000_000,
1649        };
1650
1651        let config = IndexConfig {
1652            max_indexing_memory_bytes: 4096, // tiny budget → frequent flushes
1653            num_indexing_threads: 1,
1654            merge_policy: Box::new(policy),
1655            max_concurrent_merges: 4,
1656            ..Default::default()
1657        };
1658
1659        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1660            .await
1661            .unwrap();
1662
1663        let num_commits = 40;
1664        let docs_per_commit = 30;
1665        let total_docs = num_commits * docs_per_commit;
1666        let mut max_segments_seen = 0usize;
1667
1668        for commit_idx in 0..num_commits {
1669            for i in 0..docs_per_commit {
1670                let mut doc = Document::new();
1671                doc.add_text(
1672                    title,
1673                    format!("doc_{} text", commit_idx * docs_per_commit + i),
1674                );
1675                writer.add_document(doc).unwrap();
1676            }
1677            writer.commit().await.unwrap();
1678
1679            // Give background merges a moment to run
1680            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1681
1682            let seg_count = writer.segment_manager.get_segment_ids().await.len();
1683            max_segments_seen = max_segments_seen.max(seg_count);
1684        }
1685
1686        // Wait for all merges to finish
1687        writer.wait_for_all_merges().await;
1688
1689        let final_segments = writer.segment_manager.get_segment_ids().await.len();
1690        let final_docs: u64 = writer
1691            .segment_manager
1692            .read_metadata(|m| {
1693                m.segment_metas
1694                    .values()
1695                    .map(|s| s.num_docs as u64)
1696                    .sum::<u64>()
1697            })
1698            .await;
1699
1700        eprintln!(
1701            "Sustained indexing: {} commits, {} total docs, final segments={}, max segments seen={}",
1702            num_commits, total_docs, final_segments, max_segments_seen
1703        );
1704
1705        // With 1200 docs and segments_per_tier=3, tier_floor=50:
1706        // tier 0: ≤50 docs, tier 1: 50-500, tier 2: 500-5000
1707        // We should have at most ~3 segments per tier * ~3 tiers ≈ 9-12 segments at peak.
1708        // The key invariant: segment count must NOT grow linearly with commits.
1709        // 40 commits should NOT produce 40 segments.
1710        let max_allowed = num_commits / 2; // generous: at most half the commits as segments
1711        assert!(
1712            max_segments_seen <= max_allowed,
1713            "Segment count grew too fast: max seen {} > allowed {} (out of {} commits). \
1714             Merging is not keeping up.",
1715            max_segments_seen,
1716            max_allowed,
1717            num_commits
1718        );
1719
1720        // After all merges complete, should be well under the limit
1721        assert!(
1722            final_segments <= 10,
1723            "After all merges, expected ≤10 segments, got {}",
1724            final_segments
1725        );
1726
1727        // No data loss
1728        assert_eq!(
1729            final_docs, total_docs as u64,
1730            "Expected {} docs, metadata reports {}",
1731            total_docs, final_docs
1732        );
1733    }
1734
1735    // ========================================================================
1736    // Needle-in-haystack comprehensive tests
1737    // ========================================================================
1738
1739    /// Full-text needle-in-haystack: one unique term among many documents.
1740    /// Verifies exact retrieval, scoring, and document content after commit + reopen.
1741    #[tokio::test]
1742    async fn test_needle_fulltext_single_segment() {
1743        let mut sb = SchemaBuilder::default();
1744        let title = sb.add_text_field("title", true, true);
1745        let body = sb.add_text_field("body", true, true);
1746        let schema = sb.build();
1747
1748        let dir = RamDirectory::new();
1749        let config = IndexConfig::default();
1750        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1751            .await
1752            .unwrap();
1753
1754        // 100 hay documents
1755        for i in 0..100 {
1756            let mut doc = Document::new();
1757            doc.add_text(title, format!("Hay document number {}", i));
1758            doc.add_text(
1759                body,
1760                "common words repeated across all hay documents filler text",
1761            );
1762            writer.add_document(doc).unwrap();
1763        }
1764
1765        // 1 needle document (doc 100)
1766        let mut needle = Document::new();
1767        needle.add_text(title, "The unique needle xylophone");
1768        needle.add_text(
1769            body,
1770            "This document contains the extraordinary term xylophone",
1771        );
1772        // Insert needle among hay by re-adding remaining hay after it
1773        // Actually, we already added 100, so needle is doc 100
1774        writer.add_document(needle).unwrap();
1775
1776        // 50 more hay documents after needle
1777        for i in 100..150 {
1778            let mut doc = Document::new();
1779            doc.add_text(title, format!("More hay document {}", i));
1780            doc.add_text(body, "common words filler text again and again");
1781            writer.add_document(doc).unwrap();
1782        }
1783
1784        writer.commit().await.unwrap();
1785
1786        let index = Index::open(dir, config).await.unwrap();
1787        assert_eq!(index.num_docs().await.unwrap(), 151);
1788
1789        // Search for the needle term
1790        let results = index.query("xylophone", 10).await.unwrap();
1791        assert_eq!(results.hits.len(), 1, "Should find exactly the needle");
1792        assert!(results.hits[0].score > 0.0, "Score should be positive");
1793
1794        // Verify document content
1795        let doc = index
1796            .get_document(&results.hits[0].address)
1797            .await
1798            .unwrap()
1799            .unwrap();
1800        let title_val = doc.get_first(title).unwrap().as_text().unwrap();
1801        assert!(
1802            title_val.contains("xylophone"),
1803            "Retrieved doc should be the needle"
1804        );
1805
1806        // Search for common term — should return many
1807        let results = index.query("common", 200).await.unwrap();
1808        assert!(
1809            results.hits.len() >= 100,
1810            "Common term should match many docs"
1811        );
1812
1813        // Negative test — term that doesn't exist
1814        let results = index.query("nonexistentterm99999", 10).await.unwrap();
1815        assert_eq!(
1816            results.hits.len(),
1817            0,
1818            "Non-existent term should match nothing"
1819        );
1820    }
1821
1822    /// Full-text needle across multiple segments: ensures cross-segment search works.
1823    #[tokio::test]
1824    async fn test_needle_fulltext_multi_segment() {
1825        use crate::query::TermQuery;
1826
1827        let mut sb = SchemaBuilder::default();
1828        let content = sb.add_text_field("content", true, true);
1829        let schema = sb.build();
1830
1831        let dir = RamDirectory::new();
1832        let config = IndexConfig::default();
1833        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1834            .await
1835            .unwrap();
1836
1837        // Segment 1: 50 hay docs
1838        for i in 0..50 {
1839            let mut doc = Document::new();
1840            doc.add_text(content, format!("segment one hay document {}", i));
1841            writer.add_document(doc).unwrap();
1842        }
1843        writer.commit().await.unwrap();
1844
1845        // Segment 2: needle + 49 hay docs
1846        let mut needle = Document::new();
1847        needle.add_text(content, "the magnificent quetzalcoatl serpent deity");
1848        writer.add_document(needle).unwrap();
1849        for i in 0..49 {
1850            let mut doc = Document::new();
1851            doc.add_text(content, format!("segment two hay document {}", i));
1852            writer.add_document(doc).unwrap();
1853        }
1854        writer.commit().await.unwrap();
1855
1856        // Segment 3: 50 more hay docs
1857        for i in 0..50 {
1858            let mut doc = Document::new();
1859            doc.add_text(content, format!("segment three hay document {}", i));
1860            writer.add_document(doc).unwrap();
1861        }
1862        writer.commit().await.unwrap();
1863
1864        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1865        assert_eq!(index.num_docs().await.unwrap(), 150);
1866        let num_segments = index.segment_readers().await.unwrap().len();
1867        assert!(
1868            num_segments >= 2,
1869            "Should have multiple segments, got {}",
1870            num_segments
1871        );
1872
1873        // Find needle across segments
1874        let results = index.query("quetzalcoatl", 10).await.unwrap();
1875        assert_eq!(
1876            results.hits.len(),
1877            1,
1878            "Should find exactly 1 needle across segments"
1879        );
1880
1881        // Verify using TermQuery directly
1882        let reader = index.reader().await.unwrap();
1883        let searcher = reader.searcher().await.unwrap();
1884        let tq = TermQuery::text(content, "quetzalcoatl");
1885        let results = searcher.search(&tq, 10).await.unwrap();
1886        assert_eq!(results.len(), 1, "TermQuery should also find the needle");
1887
1888        // Verify content
1889        let doc = searcher.doc(results[0].doc_id).await.unwrap().unwrap();
1890        let text = doc.get_first(content).unwrap().as_text().unwrap();
1891        assert!(
1892            text.contains("quetzalcoatl"),
1893            "Should retrieve needle content"
1894        );
1895
1896        // Cross-segment term that exists in all segments
1897        let results = index.query("document", 200).await.unwrap();
1898        assert!(
1899            results.hits.len() >= 149,
1900            "Should find hay docs across all segments"
1901        );
1902    }
1903
1904    /// Sparse vector needle-in-haystack: one document with unique dimensions.
1905    #[tokio::test]
1906    async fn test_needle_sparse_vector() {
1907        use crate::query::SparseVectorQuery;
1908
1909        let mut sb = SchemaBuilder::default();
1910        let title = sb.add_text_field("title", true, true);
1911        let sparse = sb.add_sparse_vector_field("sparse", true, true);
1912        let schema = sb.build();
1913
1914        let dir = RamDirectory::new();
1915        let config = IndexConfig::default();
1916        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1917            .await
1918            .unwrap();
1919
1920        // 100 hay documents with sparse vectors on dimensions 0-9
1921        for i in 0..100 {
1922            let mut doc = Document::new();
1923            doc.add_text(title, format!("Hay sparse doc {}", i));
1924            // All hay docs share dimensions 0-9 with varying weights
1925            let entries: Vec<(u32, f32)> = (0..10)
1926                .map(|d| (d, 0.1 + (i as f32 * 0.001) + (d as f32 * 0.01)))
1927                .collect();
1928            doc.add_sparse_vector(sparse, entries);
1929            writer.add_document(doc).unwrap();
1930        }
1931
1932        // Needle: unique dimensions 1000, 1001, 1002 (no other doc has these)
1933        let mut needle = Document::new();
1934        needle.add_text(title, "Needle sparse document");
1935        needle.add_sparse_vector(
1936            sparse,
1937            vec![(1000, 0.9), (1001, 0.8), (1002, 0.7), (5, 0.3)],
1938        );
1939        writer.add_document(needle).unwrap();
1940
1941        // 50 more hay docs
1942        for i in 100..150 {
1943            let mut doc = Document::new();
1944            doc.add_text(title, format!("More hay sparse doc {}", i));
1945            let entries: Vec<(u32, f32)> = (0..10).map(|d| (d, 0.2 + (d as f32 * 0.02))).collect();
1946            doc.add_sparse_vector(sparse, entries);
1947            writer.add_document(doc).unwrap();
1948        }
1949
1950        writer.commit().await.unwrap();
1951
1952        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1953        assert_eq!(index.num_docs().await.unwrap(), 151);
1954
1955        // Query with needle's unique dimensions
1956        let reader = index.reader().await.unwrap();
1957        let searcher = reader.searcher().await.unwrap();
1958        let query = SparseVectorQuery::new(sparse, vec![(1000, 1.0), (1001, 1.0), (1002, 1.0)]);
1959        let results = searcher.search(&query, 10).await.unwrap();
1960        assert_eq!(results.len(), 1, "Only needle has dims 1000-1002");
1961        assert!(results[0].score > 0.0, "Needle score should be positive");
1962
1963        // Verify it's the right document
1964        let doc = searcher.doc(results[0].doc_id).await.unwrap().unwrap();
1965        let title_val = doc.get_first(title).unwrap().as_text().unwrap();
1966        assert_eq!(title_val, "Needle sparse document");
1967
1968        // Query with shared dimension — should match many
1969        let query_shared = SparseVectorQuery::new(sparse, vec![(5, 1.0)]);
1970        let results = searcher.search(&query_shared, 200).await.unwrap();
1971        assert!(
1972            results.len() >= 100,
1973            "Shared dim 5 should match many docs, got {}",
1974            results.len()
1975        );
1976
1977        // Query with non-existent dimension — should match nothing
1978        let query_missing = SparseVectorQuery::new(sparse, vec![(99999, 1.0)]);
1979        let results = searcher.search(&query_missing, 10).await.unwrap();
1980        assert_eq!(
1981            results.len(),
1982            0,
1983            "Non-existent dimension should match nothing"
1984        );
1985    }
1986
1987    /// Sparse vector needle across multiple segments with merge.
1988    #[tokio::test]
1989    async fn test_needle_sparse_vector_multi_segment_merge() {
1990        use crate::query::SparseVectorQuery;
1991
1992        let mut sb = SchemaBuilder::default();
1993        let title = sb.add_text_field("title", true, true);
1994        let sparse = sb.add_sparse_vector_field("sparse", true, true);
1995        let schema = sb.build();
1996
1997        let dir = RamDirectory::new();
1998        let config = IndexConfig::default();
1999        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
2000            .await
2001            .unwrap();
2002
2003        // Segment 1: hay
2004        for i in 0..30 {
2005            let mut doc = Document::new();
2006            doc.add_text(title, format!("seg1 hay {}", i));
2007            doc.add_sparse_vector(sparse, vec![(0, 0.5), (1, 0.3)]);
2008            writer.add_document(doc).unwrap();
2009        }
2010        writer.commit().await.unwrap();
2011
2012        // Segment 2: needle + hay
2013        let mut needle = Document::new();
2014        needle.add_text(title, "seg2 needle");
2015        needle.add_sparse_vector(sparse, vec![(500, 0.95), (501, 0.85)]);
2016        writer.add_document(needle).unwrap();
2017        for i in 0..29 {
2018            let mut doc = Document::new();
2019            doc.add_text(title, format!("seg2 hay {}", i));
2020            doc.add_sparse_vector(sparse, vec![(0, 0.4), (2, 0.6)]);
2021            writer.add_document(doc).unwrap();
2022        }
2023        writer.commit().await.unwrap();
2024
2025        // Verify pre-merge
2026        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
2027        assert_eq!(index.num_docs().await.unwrap(), 60);
2028
2029        let reader = index.reader().await.unwrap();
2030        let searcher = reader.searcher().await.unwrap();
2031        let query = SparseVectorQuery::new(sparse, vec![(500, 1.0), (501, 1.0)]);
2032        let results = searcher.search(&query, 10).await.unwrap();
2033        assert_eq!(results.len(), 1, "Pre-merge: needle should be found");
2034        let doc = searcher.doc(results[0].doc_id).await.unwrap().unwrap();
2035        assert_eq!(
2036            doc.get_first(title).unwrap().as_text().unwrap(),
2037            "seg2 needle"
2038        );
2039
2040        // Force merge
2041        let mut writer = IndexWriter::open(dir.clone(), config.clone())
2042            .await
2043            .unwrap();
2044        writer.force_merge().await.unwrap();
2045
2046        // Verify post-merge
2047        let index = Index::open(dir, config).await.unwrap();
2048        assert_eq!(index.segment_readers().await.unwrap().len(), 1);
2049        assert_eq!(index.num_docs().await.unwrap(), 60);
2050
2051        let reader = index.reader().await.unwrap();
2052        let searcher = reader.searcher().await.unwrap();
2053        let query = SparseVectorQuery::new(sparse, vec![(500, 1.0), (501, 1.0)]);
2054        let results = searcher.search(&query, 10).await.unwrap();
2055        assert_eq!(results.len(), 1, "Post-merge: needle should still be found");
2056        let doc = searcher.doc(results[0].doc_id).await.unwrap().unwrap();
2057        assert_eq!(
2058            doc.get_first(title).unwrap().as_text().unwrap(),
2059            "seg2 needle"
2060        );
2061    }
2062
2063    /// Dense vector needle-in-haystack using brute-force (Flat) search.
2064    #[tokio::test]
2065    async fn test_needle_dense_vector_flat() {
2066        use crate::dsl::{DenseVectorConfig, VectorIndexType};
2067        use crate::query::DenseVectorQuery;
2068
2069        let dim = 16;
2070        let mut sb = SchemaBuilder::default();
2071        let title = sb.add_text_field("title", true, true);
2072        let embedding = sb.add_dense_vector_field_with_config(
2073            "embedding",
2074            true,
2075            true,
2076            DenseVectorConfig {
2077                dim,
2078                index_type: VectorIndexType::Flat,
2079                quantization: crate::dsl::DenseVectorQuantization::F32,
2080                num_clusters: None,
2081                nprobe: 0,
2082                build_threshold: None,
2083                unit_norm: false,
2084            },
2085        );
2086        let schema = sb.build();
2087
2088        let dir = RamDirectory::new();
2089        let config = IndexConfig::default();
2090        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
2091            .await
2092            .unwrap();
2093
2094        // 100 hay docs: vectors near origin (small random-ish values)
2095        for i in 0..100 {
2096            let mut doc = Document::new();
2097            doc.add_text(title, format!("Hay dense doc {}", i));
2098            // Hay vectors: low-magnitude, varying direction
2099            let vec: Vec<f32> = (0..dim)
2100                .map(|d| ((i * 7 + d * 13) % 100) as f32 / 1000.0)
2101                .collect();
2102            doc.add_dense_vector(embedding, vec);
2103            writer.add_document(doc).unwrap();
2104        }
2105
2106        // Needle: vector pointing strongly in one direction [1,1,1,...,1]
2107        let mut needle = Document::new();
2108        needle.add_text(title, "Needle dense document");
2109        let needle_vec: Vec<f32> = vec![1.0; dim];
2110        needle.add_dense_vector(embedding, needle_vec.clone());
2111        writer.add_document(needle).unwrap();
2112
2113        writer.commit().await.unwrap();
2114
2115        let index = Index::open(dir, config).await.unwrap();
2116        assert_eq!(index.num_docs().await.unwrap(), 101);
2117
2118        // Query with the needle vector — it should be the top result
2119        let reader = index.reader().await.unwrap();
2120        let searcher = reader.searcher().await.unwrap();
2121        let query = DenseVectorQuery::new(embedding, needle_vec);
2122        let results = searcher.search(&query, 5).await.unwrap();
2123        assert!(!results.is_empty(), "Should find at least 1 result");
2124
2125        // The needle (exact match) should be the top result with highest score
2126        let top_doc = searcher.doc(results[0].doc_id).await.unwrap().unwrap();
2127        let top_title = top_doc.get_first(title).unwrap().as_text().unwrap();
2128        assert_eq!(
2129            top_title, "Needle dense document",
2130            "Top result should be the needle (exact vector match)"
2131        );
2132        assert!(
2133            results[0].score > 0.9,
2134            "Exact match should have very high cosine similarity, got {}",
2135            results[0].score
2136        );
2137    }
2138
2139    /// Combined: full-text + sparse + dense in the same index.
2140    /// Verifies all three retrieval paths work independently on the same dataset.
2141    #[tokio::test]
2142    async fn test_needle_combined_all_modalities() {
2143        use crate::dsl::{DenseVectorConfig, VectorIndexType};
2144        use crate::query::{DenseVectorQuery, SparseVectorQuery, TermQuery};
2145
2146        let dim = 8;
2147        let mut sb = SchemaBuilder::default();
2148        let title = sb.add_text_field("title", true, true);
2149        let body = sb.add_text_field("body", true, true);
2150        let sparse = sb.add_sparse_vector_field("sparse", true, true);
2151        let embedding = sb.add_dense_vector_field_with_config(
2152            "embedding",
2153            true,
2154            true,
2155            DenseVectorConfig {
2156                dim,
2157                index_type: VectorIndexType::Flat,
2158                quantization: crate::dsl::DenseVectorQuantization::F32,
2159                num_clusters: None,
2160                nprobe: 0,
2161                build_threshold: None,
2162                unit_norm: false,
2163            },
2164        );
2165        let schema = sb.build();
2166
2167        let dir = RamDirectory::new();
2168        let config = IndexConfig::default();
2169        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
2170            .await
2171            .unwrap();
2172
2173        // 80 hay docs with all three modalities
2174        for i in 0..80u32 {
2175            let mut doc = Document::new();
2176            doc.add_text(title, format!("Hay doc {}", i));
2177            doc.add_text(body, "general filler text about nothing special");
2178            doc.add_sparse_vector(sparse, vec![(0, 0.3), (1, 0.2), ((i % 10) + 10, 0.5)]);
2179            let vec: Vec<f32> = (0..dim)
2180                .map(|d| ((i as usize * 3 + d * 7) % 50) as f32 / 100.0)
2181                .collect();
2182            doc.add_dense_vector(embedding, vec);
2183            writer.add_document(doc).unwrap();
2184        }
2185
2186        // Needle doc: unique in ALL three modalities
2187        let mut needle = Document::new();
2188        needle.add_text(title, "The extraordinary rhinoceros");
2189        needle.add_text(
2190            body,
2191            "This document about rhinoceros is the only one with this word",
2192        );
2193        needle.add_sparse_vector(sparse, vec![(9999, 0.99), (9998, 0.88)]);
2194        let needle_vec = vec![0.9; dim];
2195        needle.add_dense_vector(embedding, needle_vec.clone());
2196        writer.add_document(needle).unwrap();
2197
2198        writer.commit().await.unwrap();
2199
2200        let index = Index::open(dir, config).await.unwrap();
2201        assert_eq!(index.num_docs().await.unwrap(), 81);
2202
2203        let reader = index.reader().await.unwrap();
2204        let searcher = reader.searcher().await.unwrap();
2205
2206        // --- Full-text needle ---
2207        let tq = TermQuery::text(body, "rhinoceros");
2208        let results = searcher.search(&tq, 10).await.unwrap();
2209        assert_eq!(
2210            results.len(),
2211            1,
2212            "Full-text: should find exactly the needle"
2213        );
2214        let doc = searcher.doc(results[0].doc_id).await.unwrap().unwrap();
2215        assert!(
2216            doc.get_first(title)
2217                .unwrap()
2218                .as_text()
2219                .unwrap()
2220                .contains("rhinoceros")
2221        );
2222
2223        // --- Sparse vector needle ---
2224        let sq = SparseVectorQuery::new(sparse, vec![(9999, 1.0), (9998, 1.0)]);
2225        let results = searcher.search(&sq, 10).await.unwrap();
2226        assert_eq!(results.len(), 1, "Sparse: should find exactly the needle");
2227        let doc = searcher.doc(results[0].doc_id).await.unwrap().unwrap();
2228        assert!(
2229            doc.get_first(title)
2230                .unwrap()
2231                .as_text()
2232                .unwrap()
2233                .contains("rhinoceros")
2234        );
2235
2236        // --- Dense vector needle ---
2237        let dq = DenseVectorQuery::new(embedding, needle_vec);
2238        let results = searcher.search(&dq, 1).await.unwrap();
2239        assert!(!results.is_empty(), "Dense: should find at least 1 result");
2240        let doc = searcher.doc(results[0].doc_id).await.unwrap().unwrap();
2241        assert_eq!(
2242            doc.get_first(title).unwrap().as_text().unwrap(),
2243            "The extraordinary rhinoceros",
2244            "Dense: top-1 should be the needle"
2245        );
2246
2247        // Verify all three found the same document
2248        let ft_doc_id = {
2249            let tq = TermQuery::text(body, "rhinoceros");
2250            let r = searcher.search(&tq, 1).await.unwrap();
2251            r[0].doc_id
2252        };
2253        let sp_doc_id = {
2254            let sq = SparseVectorQuery::new(sparse, vec![(9999, 1.0)]);
2255            let r = searcher.search(&sq, 1).await.unwrap();
2256            r[0].doc_id
2257        };
2258        let dn_doc_id = {
2259            let dq = DenseVectorQuery::new(embedding, vec![0.9; dim]);
2260            let r = searcher.search(&dq, 1).await.unwrap();
2261            r[0].doc_id
2262        };
2263
2264        assert_eq!(
2265            ft_doc_id, sp_doc_id,
2266            "Full-text and sparse should find same doc"
2267        );
2268        assert_eq!(
2269            sp_doc_id, dn_doc_id,
2270            "Sparse and dense should find same doc"
2271        );
2272    }
2273
2274    /// Stress test: many needles scattered across segments, verify ALL are found.
2275    #[tokio::test]
2276    async fn test_many_needles_all_found() {
2277        let mut sb = SchemaBuilder::default();
2278        let content = sb.add_text_field("content", true, true);
2279        let schema = sb.build();
2280
2281        let dir = RamDirectory::new();
2282        let config = IndexConfig::default();
2283        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
2284            .await
2285            .unwrap();
2286
2287        let num_needles = 20usize;
2288        let hay_per_batch = 50usize;
2289        let needle_terms: Vec<String> = (0..num_needles)
2290            .map(|i| format!("uniqueneedle{:04}", i))
2291            .collect();
2292
2293        // Interleave needles with hay across commits
2294        for batch in 0..4 {
2295            // Hay
2296            for i in 0..hay_per_batch {
2297                let mut doc = Document::new();
2298                doc.add_text(
2299                    content,
2300                    format!("hay batch {} item {} common filler", batch, i),
2301                );
2302                writer.add_document(doc).unwrap();
2303            }
2304            // 5 needles per batch
2305            for n in 0..5 {
2306                let needle_idx = batch * 5 + n;
2307                let mut doc = Document::new();
2308                doc.add_text(
2309                    content,
2310                    format!("this is {} among many documents", needle_terms[needle_idx]),
2311                );
2312                writer.add_document(doc).unwrap();
2313            }
2314            writer.commit().await.unwrap();
2315        }
2316
2317        let index = Index::open(dir, config).await.unwrap();
2318        let total = index.num_docs().await.unwrap();
2319        assert_eq!(total, (hay_per_batch * 4 + num_needles) as u32);
2320
2321        // Find EVERY needle
2322        for term in &needle_terms {
2323            let results = index.query(term, 10).await.unwrap();
2324            assert_eq!(
2325                results.hits.len(),
2326                1,
2327                "Should find exactly 1 doc for needle '{}'",
2328                term
2329            );
2330        }
2331
2332        // Verify hay term matches all hay docs
2333        let results = index.query("common", 500).await.unwrap();
2334        assert_eq!(
2335            results.hits.len(),
2336            hay_per_batch * 4,
2337            "Common term should match all {} hay docs",
2338            hay_per_batch * 4
2339        );
2340    }
2341}