Skip to main content

hermes_core/index/
mod.rs

1//! Index - multi-segment async search index
2//!
3//! The `Index` is the central concept that provides:
4//! - `Index::create()` / `Index::open()` - create or open an index
5//! - `index.writer()` - get an IndexWriter for adding documents
6//! - `index.reader()` - get an IndexReader for searching (with reload policy)
7//!
8//! The Index owns the SegmentManager which handles segment lifecycle and tracking.
9
10#[cfg(feature = "native")]
11use crate::dsl::Schema;
12#[cfg(feature = "native")]
13use crate::error::Result;
14#[cfg(feature = "native")]
15use std::sync::Arc;
16
17mod searcher;
18pub use searcher::Searcher;
19
20#[cfg(feature = "native")]
21mod reader;
22#[cfg(feature = "native")]
23mod vector_builder;
24#[cfg(feature = "native")]
25mod writer;
26#[cfg(feature = "native")]
27pub use reader::IndexReader;
28#[cfg(feature = "native")]
29pub use writer::{IndexWriter, PreparedCommit};
30
31mod metadata;
32pub use metadata::{FieldVectorMeta, INDEX_META_FILENAME, IndexMetadata, VectorIndexState};
33
34#[cfg(feature = "native")]
35mod helpers;
36#[cfg(feature = "native")]
37pub use helpers::{
38    IndexingStats, SchemaConfig, SchemaFieldConfig, create_index_at_path, create_index_from_sdl,
39    index_documents_from_reader, index_json_document, parse_schema,
40};
41
42/// Default file name for the slice cache
43pub const SLICE_CACHE_FILENAME: &str = "index.slicecache";
44
45/// Index configuration
46#[derive(Debug, Clone)]
47pub struct IndexConfig {
48    /// Number of threads for CPU-intensive tasks (search parallelism)
49    pub num_threads: usize,
50    /// Number of parallel segment builders (documents distributed round-robin)
51    pub num_indexing_threads: usize,
52    /// Number of threads for parallel block compression within each segment
53    pub num_compression_threads: usize,
54    /// Block cache size for term dictionary per segment
55    pub term_cache_blocks: usize,
56    /// Block cache size for document store per segment
57    pub store_cache_blocks: usize,
58    /// Max memory (bytes) across all builders before auto-commit (global limit)
59    pub max_indexing_memory_bytes: usize,
60    /// Merge policy for background segment merging
61    pub merge_policy: Box<dyn crate::merge::MergePolicy>,
62    /// Index optimization mode (adaptive, size-optimized, performance-optimized)
63    pub optimization: crate::structures::IndexOptimization,
64    /// Reload interval in milliseconds for IndexReader (how often to check for new segments)
65    pub reload_interval_ms: u64,
66    /// Maximum number of concurrent background merges (default: 4)
67    pub max_concurrent_merges: usize,
68}
69
70impl Default for IndexConfig {
71    fn default() -> Self {
72        #[cfg(feature = "native")]
73        let indexing_threads = crate::default_indexing_threads();
74        #[cfg(not(feature = "native"))]
75        let indexing_threads = 1;
76
77        #[cfg(feature = "native")]
78        let compression_threads = crate::default_compression_threads();
79        #[cfg(not(feature = "native"))]
80        let compression_threads = 1;
81
82        Self {
83            num_threads: indexing_threads,
84            num_indexing_threads: 1, // Increase to 2+ for production to avoid stalls during segment build
85            num_compression_threads: compression_threads,
86            term_cache_blocks: 256,
87            store_cache_blocks: 32,
88            max_indexing_memory_bytes: 256 * 1024 * 1024, // 256 MB default
89            merge_policy: Box::new(crate::merge::TieredMergePolicy::default()),
90            optimization: crate::structures::IndexOptimization::default(),
91            reload_interval_ms: 1000, // 1 second default
92            max_concurrent_merges: 4,
93        }
94    }
95}
96
97/// Multi-segment async Index
98///
99/// The central concept for search. Owns segment lifecycle and provides:
100/// - `Index::create()` / `Index::open()` - create or open an index
101/// - `index.writer()` - get an IndexWriter for adding documents
102/// - `index.reader()` - get an IndexReader for searching with reload policy
103///
104/// All segment management is delegated to SegmentManager.
105#[cfg(feature = "native")]
106pub struct Index<D: crate::directories::DirectoryWriter + 'static> {
107    directory: Arc<D>,
108    schema: Arc<Schema>,
109    config: IndexConfig,
110    /// Segment manager - owns segments, tracker, metadata, and trained structures
111    segment_manager: Arc<crate::merge::SegmentManager<D>>,
112    /// Cached reader (created lazily, reused across calls)
113    cached_reader: tokio::sync::OnceCell<IndexReader<D>>,
114}
115
116#[cfg(feature = "native")]
117impl<D: crate::directories::DirectoryWriter + 'static> Index<D> {
118    /// Create a new index in the directory
119    pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
120        let directory = Arc::new(directory);
121        let schema = Arc::new(schema);
122        let metadata = IndexMetadata::new((*schema).clone());
123
124        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
125            Arc::clone(&directory),
126            Arc::clone(&schema),
127            metadata,
128            config.merge_policy.clone_box(),
129            config.term_cache_blocks,
130            config.max_concurrent_merges,
131        ));
132
133        // Save initial metadata
134        segment_manager.update_metadata(|_| {}).await?;
135
136        Ok(Self {
137            directory,
138            schema,
139            config,
140            segment_manager,
141            cached_reader: tokio::sync::OnceCell::new(),
142        })
143    }
144
145    /// Open an existing index from a directory
146    pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
147        let directory = Arc::new(directory);
148
149        // Load metadata (includes schema)
150        let metadata = IndexMetadata::load(directory.as_ref()).await?;
151        let schema = Arc::new(metadata.schema.clone());
152
153        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
154            Arc::clone(&directory),
155            Arc::clone(&schema),
156            metadata,
157            config.merge_policy.clone_box(),
158            config.term_cache_blocks,
159            config.max_concurrent_merges,
160        ));
161
162        // Load trained structures into SegmentManager's ArcSwap
163        segment_manager.load_and_publish_trained().await;
164
165        Ok(Self {
166            directory,
167            schema,
168            config,
169            segment_manager,
170            cached_reader: tokio::sync::OnceCell::new(),
171        })
172    }
173
174    /// Get the schema
175    pub fn schema(&self) -> &Schema {
176        &self.schema
177    }
178
179    /// Get the schema as an Arc reference (avoids clone when Arc is needed)
180    pub fn schema_arc(&self) -> &Arc<Schema> {
181        &self.schema
182    }
183
184    /// Get a reference to the underlying directory
185    pub fn directory(&self) -> &D {
186        &self.directory
187    }
188
189    /// Get the segment manager
190    pub fn segment_manager(&self) -> &Arc<crate::merge::SegmentManager<D>> {
191        &self.segment_manager
192    }
193
194    /// Get an IndexReader for searching (with reload policy)
195    ///
196    /// The reader is cached and reused across calls. The reader's internal
197    /// searcher will reload segments based on its reload interval (configurable via IndexConfig).
198    pub async fn reader(&self) -> Result<&IndexReader<D>> {
199        self.cached_reader
200            .get_or_try_init(|| async {
201                IndexReader::from_segment_manager(
202                    Arc::clone(&self.schema),
203                    Arc::clone(&self.segment_manager),
204                    self.config.term_cache_blocks,
205                    self.config.reload_interval_ms,
206                )
207                .await
208            })
209            .await
210    }
211
212    /// Get the config
213    pub fn config(&self) -> &IndexConfig {
214        &self.config
215    }
216
217    /// Get segment readers for query execution (convenience method)
218    pub async fn segment_readers(&self) -> Result<Vec<Arc<crate::segment::SegmentReader>>> {
219        let reader = self.reader().await?;
220        let searcher = reader.searcher().await?;
221        Ok(searcher.segment_readers().to_vec())
222    }
223
224    /// Total number of documents across all segments
225    pub async fn num_docs(&self) -> Result<u32> {
226        let reader = self.reader().await?;
227        let searcher = reader.searcher().await?;
228        Ok(searcher.num_docs())
229    }
230
231    /// Get default fields for search
232    pub fn default_fields(&self) -> Vec<crate::Field> {
233        if !self.schema.default_fields().is_empty() {
234            self.schema.default_fields().to_vec()
235        } else {
236            self.schema
237                .fields()
238                .filter(|(_, entry)| {
239                    entry.indexed && entry.field_type == crate::dsl::FieldType::Text
240                })
241                .map(|(field, _)| field)
242                .collect()
243        }
244    }
245
246    /// Get tokenizer registry
247    pub fn tokenizers(&self) -> Arc<crate::tokenizer::TokenizerRegistry> {
248        Arc::new(crate::tokenizer::TokenizerRegistry::default())
249    }
250
251    /// Create a query parser for this index
252    pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
253        let default_fields = self.default_fields();
254        let tokenizers = self.tokenizers();
255
256        let query_routers = self.schema.query_routers();
257        if !query_routers.is_empty()
258            && let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers)
259        {
260            return crate::dsl::QueryLanguageParser::with_router(
261                Arc::clone(&self.schema),
262                default_fields,
263                tokenizers,
264                router,
265            );
266        }
267
268        crate::dsl::QueryLanguageParser::new(Arc::clone(&self.schema), default_fields, tokenizers)
269    }
270
271    /// Parse and search using a query string
272    pub async fn query(
273        &self,
274        query_str: &str,
275        limit: usize,
276    ) -> Result<crate::query::SearchResponse> {
277        self.query_offset(query_str, limit, 0).await
278    }
279
280    /// Query with offset for pagination
281    pub async fn query_offset(
282        &self,
283        query_str: &str,
284        limit: usize,
285        offset: usize,
286    ) -> Result<crate::query::SearchResponse> {
287        let parser = self.query_parser();
288        let query = parser
289            .parse(query_str)
290            .map_err(crate::error::Error::Query)?;
291        self.search_offset(query.as_ref(), limit, offset).await
292    }
293
294    /// Search and return results
295    pub async fn search(
296        &self,
297        query: &dyn crate::query::Query,
298        limit: usize,
299    ) -> Result<crate::query::SearchResponse> {
300        self.search_offset(query, limit, 0).await
301    }
302
303    /// Search with offset for pagination
304    pub async fn search_offset(
305        &self,
306        query: &dyn crate::query::Query,
307        limit: usize,
308        offset: usize,
309    ) -> Result<crate::query::SearchResponse> {
310        let reader = self.reader().await?;
311        let searcher = reader.searcher().await?;
312        let segments = searcher.segment_readers();
313
314        let fetch_limit = offset + limit;
315
316        let futures: Vec<_> = segments
317            .iter()
318            .map(|segment| {
319                let sid = segment.meta().id;
320                async move {
321                    let results =
322                        crate::query::search_segment(segment.as_ref(), query, fetch_limit).await?;
323                    Ok::<_, crate::error::Error>(
324                        results
325                            .into_iter()
326                            .map(move |r| (sid, r))
327                            .collect::<Vec<_>>(),
328                    )
329                }
330            })
331            .collect();
332
333        let batches = futures::future::try_join_all(futures).await?;
334        let mut all_results: Vec<(u128, crate::query::SearchResult)> =
335            Vec::with_capacity(batches.iter().map(|b| b.len()).sum());
336        for batch in batches {
337            all_results.extend(batch);
338        }
339
340        all_results.sort_by(|a, b| {
341            b.1.score
342                .partial_cmp(&a.1.score)
343                .unwrap_or(std::cmp::Ordering::Equal)
344        });
345
346        let total_hits = all_results.len() as u32;
347
348        let hits: Vec<crate::query::SearchHit> = all_results
349            .into_iter()
350            .skip(offset)
351            .take(limit)
352            .map(|(segment_id, result)| crate::query::SearchHit {
353                address: crate::query::DocAddress::new(segment_id, result.doc_id),
354                score: result.score,
355                matched_fields: result.extract_ordinals(),
356            })
357            .collect();
358
359        Ok(crate::query::SearchResponse { hits, total_hits })
360    }
361
362    /// Get a document by its unique address
363    pub async fn get_document(
364        &self,
365        address: &crate::query::DocAddress,
366    ) -> Result<Option<crate::dsl::Document>> {
367        let reader = self.reader().await?;
368        let searcher = reader.searcher().await?;
369        searcher.get_document(address).await
370    }
371
372    /// Get posting lists for a term across all segments
373    pub async fn get_postings(
374        &self,
375        field: crate::Field,
376        term: &[u8],
377    ) -> Result<
378        Vec<(
379            Arc<crate::segment::SegmentReader>,
380            crate::structures::BlockPostingList,
381        )>,
382    > {
383        let segments = self.segment_readers().await?;
384        let mut results = Vec::new();
385
386        for segment in segments {
387            if let Some(postings) = segment.get_postings(field, term).await? {
388                results.push((segment, postings));
389            }
390        }
391
392        Ok(results)
393    }
394}
395
396/// Native-only methods for Index
397#[cfg(feature = "native")]
398impl<D: crate::directories::DirectoryWriter + 'static> Index<D> {
399    /// Get an IndexWriter for adding documents
400    pub fn writer(&self) -> writer::IndexWriter<D> {
401        writer::IndexWriter::from_index(self)
402    }
403}
404
405#[cfg(test)]
406mod tests {
407    use super::*;
408    use crate::directories::RamDirectory;
409    use crate::dsl::{Document, SchemaBuilder};
410
411    #[tokio::test]
412    async fn test_index_create_and_search() {
413        let mut schema_builder = SchemaBuilder::default();
414        let title = schema_builder.add_text_field("title", true, true);
415        let body = schema_builder.add_text_field("body", true, true);
416        let schema = schema_builder.build();
417
418        let dir = RamDirectory::new();
419        let config = IndexConfig::default();
420
421        // Create index and add documents
422        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
423            .await
424            .unwrap();
425
426        let mut doc1 = Document::new();
427        doc1.add_text(title, "Hello World");
428        doc1.add_text(body, "This is the first document");
429        writer.add_document(doc1).unwrap();
430
431        let mut doc2 = Document::new();
432        doc2.add_text(title, "Goodbye World");
433        doc2.add_text(body, "This is the second document");
434        writer.add_document(doc2).unwrap();
435
436        writer.commit().await.unwrap();
437
438        // Open for reading
439        let index = Index::open(dir, config).await.unwrap();
440        assert_eq!(index.num_docs().await.unwrap(), 2);
441
442        // Check postings
443        let postings = index.get_postings(title, b"world").await.unwrap();
444        assert_eq!(postings.len(), 1); // One segment
445        assert_eq!(postings[0].1.doc_count(), 2); // Two docs with "world"
446
447        // Retrieve document via searcher snapshot
448        let reader = index.reader().await.unwrap();
449        let searcher = reader.searcher().await.unwrap();
450        let doc = searcher.doc(0).await.unwrap().unwrap();
451        assert_eq!(doc.get_first(title).unwrap().as_text(), Some("Hello World"));
452    }
453
454    #[tokio::test]
455    async fn test_multiple_segments() {
456        let mut schema_builder = SchemaBuilder::default();
457        let title = schema_builder.add_text_field("title", true, true);
458        let schema = schema_builder.build();
459
460        let dir = RamDirectory::new();
461        let config = IndexConfig {
462            max_indexing_memory_bytes: 1024, // Very small to trigger frequent flushes
463            ..Default::default()
464        };
465
466        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
467            .await
468            .unwrap();
469
470        // Add documents in batches to create multiple segments
471        for batch in 0..3 {
472            for i in 0..5 {
473                let mut doc = Document::new();
474                doc.add_text(title, format!("Document {} batch {}", i, batch));
475                writer.add_document(doc).unwrap();
476            }
477            writer.commit().await.unwrap();
478        }
479
480        // Open and check
481        let index = Index::open(dir, config).await.unwrap();
482        assert_eq!(index.num_docs().await.unwrap(), 15);
483        // With queue-based indexing, exact segment count varies
484        assert!(
485            index.segment_readers().await.unwrap().len() >= 2,
486            "Expected multiple segments"
487        );
488    }
489
490    #[tokio::test]
491    async fn test_segment_merge() {
492        let mut schema_builder = SchemaBuilder::default();
493        let title = schema_builder.add_text_field("title", true, true);
494        let schema = schema_builder.build();
495
496        let dir = RamDirectory::new();
497        let config = IndexConfig {
498            max_indexing_memory_bytes: 512, // Very small to trigger frequent flushes
499            ..Default::default()
500        };
501
502        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
503            .await
504            .unwrap();
505
506        // Create multiple segments by flushing between batches
507        for batch in 0..3 {
508            for i in 0..3 {
509                let mut doc = Document::new();
510                doc.add_text(title, format!("Document {} batch {}", i, batch));
511                writer.add_document(doc).unwrap();
512            }
513            writer.commit().await.unwrap();
514        }
515
516        // Should have multiple segments (at least 2, one per flush with docs)
517        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
518        assert!(
519            index.segment_readers().await.unwrap().len() >= 2,
520            "Expected multiple segments"
521        );
522
523        // Force merge
524        let mut writer = IndexWriter::open(dir.clone(), config.clone())
525            .await
526            .unwrap();
527        writer.force_merge().await.unwrap();
528
529        // Should have 1 segment now
530        let index = Index::open(dir, config).await.unwrap();
531        assert_eq!(index.segment_readers().await.unwrap().len(), 1);
532        assert_eq!(index.num_docs().await.unwrap(), 9);
533
534        // Verify all documents accessible (order may vary with queue-based indexing)
535        let reader = index.reader().await.unwrap();
536        let searcher = reader.searcher().await.unwrap();
537        let mut found_docs = 0;
538        for i in 0..9 {
539            if searcher.doc(i).await.unwrap().is_some() {
540                found_docs += 1;
541            }
542        }
543        assert_eq!(found_docs, 9);
544    }
545
546    #[tokio::test]
547    async fn test_match_query() {
548        let mut schema_builder = SchemaBuilder::default();
549        let title = schema_builder.add_text_field("title", true, true);
550        let body = schema_builder.add_text_field("body", true, true);
551        let schema = schema_builder.build();
552
553        let dir = RamDirectory::new();
554        let config = IndexConfig::default();
555
556        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
557            .await
558            .unwrap();
559
560        let mut doc1 = Document::new();
561        doc1.add_text(title, "rust programming");
562        doc1.add_text(body, "Learn rust language");
563        writer.add_document(doc1).unwrap();
564
565        let mut doc2 = Document::new();
566        doc2.add_text(title, "python programming");
567        doc2.add_text(body, "Learn python language");
568        writer.add_document(doc2).unwrap();
569
570        writer.commit().await.unwrap();
571
572        let index = Index::open(dir, config).await.unwrap();
573
574        // Test match query with multiple default fields
575        let results = index.query("rust", 10).await.unwrap();
576        assert_eq!(results.hits.len(), 1);
577
578        // Test match query with multiple tokens
579        let results = index.query("rust programming", 10).await.unwrap();
580        assert!(!results.hits.is_empty());
581
582        // Verify hit has address (segment_id + doc_id)
583        let hit = &results.hits[0];
584        assert!(!hit.address.segment_id.is_empty(), "Should have segment_id");
585
586        // Verify document retrieval by address
587        let doc = index.get_document(&hit.address).await.unwrap().unwrap();
588        assert!(
589            !doc.field_values().is_empty(),
590            "Doc should have field values"
591        );
592
593        // Also verify doc retrieval via searcher snapshot
594        let reader = index.reader().await.unwrap();
595        let searcher = reader.searcher().await.unwrap();
596        let doc = searcher.doc(0).await.unwrap().unwrap();
597        assert!(
598            !doc.field_values().is_empty(),
599            "Doc should have field values"
600        );
601    }
602
603    #[tokio::test]
604    async fn test_slice_cache_warmup_and_load() {
605        use crate::directories::SliceCachingDirectory;
606
607        let mut schema_builder = SchemaBuilder::default();
608        let title = schema_builder.add_text_field("title", true, true);
609        let body = schema_builder.add_text_field("body", true, true);
610        let schema = schema_builder.build();
611
612        let dir = RamDirectory::new();
613        let config = IndexConfig::default();
614
615        // Create index with some documents
616        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
617            .await
618            .unwrap();
619
620        for i in 0..10 {
621            let mut doc = Document::new();
622            doc.add_text(title, format!("Document {} about rust", i));
623            doc.add_text(body, format!("This is body text number {}", i));
624            writer.add_document(doc).unwrap();
625        }
626        writer.commit().await.unwrap();
627
628        // Open with slice caching and perform some operations to warm up cache
629        let caching_dir = SliceCachingDirectory::new(dir.clone(), 1024 * 1024);
630        let index = Index::open(caching_dir, config.clone()).await.unwrap();
631
632        // Perform a search to warm up the cache
633        let results = index.query("rust", 10).await.unwrap();
634        assert!(!results.hits.is_empty());
635
636        // Check cache stats - should have cached some data
637        let stats = index.directory.stats();
638        assert!(stats.total_bytes > 0, "Cache should have data after search");
639    }
640
641    #[tokio::test]
642    async fn test_multivalue_field_indexing_and_search() {
643        let mut schema_builder = SchemaBuilder::default();
644        let uris = schema_builder.add_text_field("uris", true, true);
645        let title = schema_builder.add_text_field("title", true, true);
646        let schema = schema_builder.build();
647
648        let dir = RamDirectory::new();
649        let config = IndexConfig::default();
650
651        // Create index and add document with multi-value field
652        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
653            .await
654            .unwrap();
655
656        let mut doc = Document::new();
657        doc.add_text(uris, "one");
658        doc.add_text(uris, "two");
659        doc.add_text(title, "Test Document");
660        writer.add_document(doc).unwrap();
661
662        // Add another document with different uris
663        let mut doc2 = Document::new();
664        doc2.add_text(uris, "three");
665        doc2.add_text(title, "Another Document");
666        writer.add_document(doc2).unwrap();
667
668        writer.commit().await.unwrap();
669
670        // Open for reading
671        let index = Index::open(dir, config).await.unwrap();
672        assert_eq!(index.num_docs().await.unwrap(), 2);
673
674        // Verify document retrieval preserves all values
675        let reader = index.reader().await.unwrap();
676        let searcher = reader.searcher().await.unwrap();
677        let doc = searcher.doc(0).await.unwrap().unwrap();
678        let all_uris: Vec<_> = doc.get_all(uris).collect();
679        assert_eq!(all_uris.len(), 2, "Should have 2 uris values");
680        assert_eq!(all_uris[0].as_text(), Some("one"));
681        assert_eq!(all_uris[1].as_text(), Some("two"));
682
683        // Verify to_json returns array for multi-value field
684        let json = doc.to_json(index.schema());
685        let uris_json = json.get("uris").unwrap();
686        assert!(uris_json.is_array(), "Multi-value field should be an array");
687        let uris_arr = uris_json.as_array().unwrap();
688        assert_eq!(uris_arr.len(), 2);
689        assert_eq!(uris_arr[0].as_str(), Some("one"));
690        assert_eq!(uris_arr[1].as_str(), Some("two"));
691
692        // Verify both values are searchable
693        let results = index.query("uris:one", 10).await.unwrap();
694        assert_eq!(results.hits.len(), 1, "Should find doc with 'one'");
695        assert_eq!(results.hits[0].address.doc_id, 0);
696
697        let results = index.query("uris:two", 10).await.unwrap();
698        assert_eq!(results.hits.len(), 1, "Should find doc with 'two'");
699        assert_eq!(results.hits[0].address.doc_id, 0);
700
701        let results = index.query("uris:three", 10).await.unwrap();
702        assert_eq!(results.hits.len(), 1, "Should find doc with 'three'");
703        assert_eq!(results.hits[0].address.doc_id, 1);
704
705        // Verify searching for non-existent value returns no results
706        let results = index.query("uris:nonexistent", 10).await.unwrap();
707        assert_eq!(results.hits.len(), 0, "Should not find non-existent value");
708    }
709
710    /// Comprehensive test for WAND optimization in BooleanQuery OR queries
711    ///
712    /// This test verifies that:
713    /// 1. BooleanQuery with multiple SHOULD term queries uses WAND automatically
714    /// 2. Search results are correct regardless of WAND optimization
715    /// 3. Scores are reasonable for matching documents
716    #[tokio::test]
717    async fn test_wand_optimization_for_or_queries() {
718        use crate::query::{BooleanQuery, TermQuery};
719
720        let mut schema_builder = SchemaBuilder::default();
721        let content = schema_builder.add_text_field("content", true, true);
722        let schema = schema_builder.build();
723
724        let dir = RamDirectory::new();
725        let config = IndexConfig::default();
726
727        // Create index with documents containing various terms
728        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
729            .await
730            .unwrap();
731
732        // Doc 0: contains "rust" and "programming"
733        let mut doc = Document::new();
734        doc.add_text(content, "rust programming language is fast");
735        writer.add_document(doc).unwrap();
736
737        // Doc 1: contains "rust" only
738        let mut doc = Document::new();
739        doc.add_text(content, "rust is a systems language");
740        writer.add_document(doc).unwrap();
741
742        // Doc 2: contains "programming" only
743        let mut doc = Document::new();
744        doc.add_text(content, "programming is fun");
745        writer.add_document(doc).unwrap();
746
747        // Doc 3: contains "python" (neither rust nor programming)
748        let mut doc = Document::new();
749        doc.add_text(content, "python is easy to learn");
750        writer.add_document(doc).unwrap();
751
752        // Doc 4: contains both "rust" and "programming" multiple times
753        let mut doc = Document::new();
754        doc.add_text(content, "rust rust programming programming systems");
755        writer.add_document(doc).unwrap();
756
757        writer.commit().await.unwrap();
758
759        // Open for reading
760        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
761
762        // Test 1: Pure OR query with multiple terms (should use WAND automatically)
763        let or_query = BooleanQuery::new()
764            .should(TermQuery::text(content, "rust"))
765            .should(TermQuery::text(content, "programming"));
766
767        let results = index.search(&or_query, 10).await.unwrap();
768
769        // Should find docs 0, 1, 2, 4 (all that contain "rust" OR "programming")
770        assert_eq!(results.hits.len(), 4, "Should find exactly 4 documents");
771
772        let doc_ids: Vec<u32> = results.hits.iter().map(|h| h.address.doc_id).collect();
773        assert!(doc_ids.contains(&0), "Should find doc 0");
774        assert!(doc_ids.contains(&1), "Should find doc 1");
775        assert!(doc_ids.contains(&2), "Should find doc 2");
776        assert!(doc_ids.contains(&4), "Should find doc 4");
777        assert!(
778            !doc_ids.contains(&3),
779            "Should NOT find doc 3 (only has 'python')"
780        );
781
782        // Test 2: Single term query (should NOT use WAND, but still work)
783        let single_query = BooleanQuery::new().should(TermQuery::text(content, "rust"));
784
785        let results = index.search(&single_query, 10).await.unwrap();
786        assert_eq!(results.hits.len(), 3, "Should find 3 documents with 'rust'");
787
788        // Test 3: Query with MUST (should NOT use WAND)
789        let must_query = BooleanQuery::new()
790            .must(TermQuery::text(content, "rust"))
791            .should(TermQuery::text(content, "programming"));
792
793        let results = index.search(&must_query, 10).await.unwrap();
794        // Must have "rust", optionally "programming"
795        assert_eq!(results.hits.len(), 3, "Should find 3 documents with 'rust'");
796
797        // Test 4: Query with MUST_NOT (should NOT use WAND)
798        let must_not_query = BooleanQuery::new()
799            .should(TermQuery::text(content, "rust"))
800            .should(TermQuery::text(content, "programming"))
801            .must_not(TermQuery::text(content, "systems"));
802
803        let results = index.search(&must_not_query, 10).await.unwrap();
804        // Should exclude docs with "systems" (doc 1 and 4)
805        let doc_ids: Vec<u32> = results.hits.iter().map(|h| h.address.doc_id).collect();
806        assert!(
807            !doc_ids.contains(&1),
808            "Should NOT find doc 1 (has 'systems')"
809        );
810        assert!(
811            !doc_ids.contains(&4),
812            "Should NOT find doc 4 (has 'systems')"
813        );
814
815        // Test 5: Verify top-k limit works correctly with WAND
816        let or_query = BooleanQuery::new()
817            .should(TermQuery::text(content, "rust"))
818            .should(TermQuery::text(content, "programming"));
819
820        let results = index.search(&or_query, 2).await.unwrap();
821        assert_eq!(results.hits.len(), 2, "Should return only top 2 results");
822
823        // Top results should be docs that match both terms (higher scores)
824        // Doc 0 and 4 contain both "rust" and "programming"
825    }
826
827    /// Test that WAND optimization produces same results as non-WAND for correctness
828    #[tokio::test]
829    async fn test_wand_results_match_standard_boolean() {
830        use crate::query::{BooleanQuery, TermQuery, WandOrQuery};
831
832        let mut schema_builder = SchemaBuilder::default();
833        let content = schema_builder.add_text_field("content", true, true);
834        let schema = schema_builder.build();
835
836        let dir = RamDirectory::new();
837        let config = IndexConfig::default();
838
839        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
840            .await
841            .unwrap();
842
843        // Add several documents
844        for i in 0..10 {
845            let mut doc = Document::new();
846            let text = match i % 4 {
847                0 => "apple banana cherry",
848                1 => "apple orange",
849                2 => "banana grape",
850                _ => "cherry date",
851            };
852            doc.add_text(content, text);
853            writer.add_document(doc).unwrap();
854        }
855
856        writer.commit().await.unwrap();
857        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
858
859        // Compare explicit WandOrQuery with auto-optimized BooleanQuery
860        let wand_query = WandOrQuery::new(content).term("apple").term("banana");
861
862        let bool_query = BooleanQuery::new()
863            .should(TermQuery::text(content, "apple"))
864            .should(TermQuery::text(content, "banana"));
865
866        let wand_results = index.search(&wand_query, 10).await.unwrap();
867        let bool_results = index.search(&bool_query, 10).await.unwrap();
868
869        // Both should find the same documents
870        assert_eq!(
871            wand_results.hits.len(),
872            bool_results.hits.len(),
873            "WAND and Boolean should find same number of docs"
874        );
875
876        let wand_docs: std::collections::HashSet<u32> =
877            wand_results.hits.iter().map(|h| h.address.doc_id).collect();
878        let bool_docs: std::collections::HashSet<u32> =
879            bool_results.hits.iter().map(|h| h.address.doc_id).collect();
880
881        assert_eq!(
882            wand_docs, bool_docs,
883            "WAND and Boolean should find same documents"
884        );
885    }
886
887    #[tokio::test]
888    async fn test_vector_index_threshold_switch() {
889        use crate::dsl::{DenseVectorConfig, DenseVectorQuantization, VectorIndexType};
890
891        // Create schema with dense vector field configured for IVF-RaBitQ
892        let mut schema_builder = SchemaBuilder::default();
893        let title = schema_builder.add_text_field("title", true, true);
894        let embedding = schema_builder.add_dense_vector_field_with_config(
895            "embedding",
896            true, // indexed
897            true, // stored
898            DenseVectorConfig {
899                dim: 8,
900                index_type: VectorIndexType::IvfRaBitQ,
901                quantization: DenseVectorQuantization::F32,
902                num_clusters: Some(4), // Small for test
903                nprobe: 2,
904                build_threshold: Some(50), // Build when we have 50+ vectors
905            },
906        );
907        let schema = schema_builder.build();
908
909        let dir = RamDirectory::new();
910        let config = IndexConfig::default();
911
912        // Phase 1: Add vectors below threshold (should use Flat index)
913        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
914            .await
915            .unwrap();
916
917        // Add 30 documents (below threshold of 50)
918        for i in 0..30 {
919            let mut doc = Document::new();
920            doc.add_text(title, format!("Document {}", i));
921            // Simple embedding: [i, i, i, i, i, i, i, i] normalized
922            let vec: Vec<f32> = (0..8).map(|_| (i as f32) / 30.0).collect();
923            doc.add_dense_vector(embedding, vec);
924            writer.add_document(doc).unwrap();
925        }
926        writer.commit().await.unwrap();
927
928        // Open index and verify it's using Flat (not built yet)
929        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
930        assert!(
931            index.segment_manager.trained().is_none(),
932            "Should not have trained centroids below threshold"
933        );
934
935        // Search should work with Flat index
936        let query_vec: Vec<f32> = vec![0.5; 8];
937        let segments = index.segment_readers().await.unwrap();
938        assert!(!segments.is_empty());
939
940        let results = segments[0]
941            .search_dense_vector(
942                embedding,
943                &query_vec,
944                5,
945                0,
946                1,
947                crate::query::MultiValueCombiner::Max,
948            )
949            .await
950            .unwrap();
951        assert!(!results.is_empty(), "Flat search should return results");
952
953        // Phase 2: Add more vectors to cross threshold
954        let mut writer = IndexWriter::open(dir.clone(), config.clone())
955            .await
956            .unwrap();
957
958        // Add 30 more documents (total 60, above threshold of 50)
959        for i in 30..60 {
960            let mut doc = Document::new();
961            doc.add_text(title, format!("Document {}", i));
962            let vec: Vec<f32> = (0..8).map(|_| (i as f32) / 60.0).collect();
963            doc.add_dense_vector(embedding, vec);
964            writer.add_document(doc).unwrap();
965        }
966        writer.commit().await.unwrap();
967
968        // Manually trigger vector index build (no longer auto-triggered by commit)
969        writer.build_vector_index().await.unwrap();
970
971        // Reopen index and verify trained structures are loaded
972        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
973        assert!(
974            index.segment_manager.trained().is_some(),
975            "Should have loaded trained centroids for embedding field"
976        );
977
978        // Search should still work
979        let segments = index.segment_readers().await.unwrap();
980        let results = segments[0]
981            .search_dense_vector(
982                embedding,
983                &query_vec,
984                5,
985                0,
986                1,
987                crate::query::MultiValueCombiner::Max,
988            )
989            .await
990            .unwrap();
991        assert!(
992            !results.is_empty(),
993            "Search should return results after build"
994        );
995
996        // Phase 3: Verify calling build_vector_index again is a no-op
997        let writer = IndexWriter::open(dir.clone(), config.clone())
998            .await
999            .unwrap();
1000        writer.build_vector_index().await.unwrap(); // Should skip training
1001
1002        // Still built (trained structures present in ArcSwap)
1003        assert!(writer.segment_manager.trained().is_some());
1004    }
1005
1006    /// Multi-round merge: flush many small segments, merge, add more, merge again.
1007    /// Verifies search correctness (term + phrase queries) through multiple merge rounds.
1008    #[tokio::test]
1009    async fn test_multi_round_merge_with_search() {
1010        let mut schema_builder = SchemaBuilder::default();
1011        let title = schema_builder.add_text_field("title", true, true);
1012        let body = schema_builder.add_text_field("body", true, true);
1013        let schema = schema_builder.build();
1014
1015        let dir = RamDirectory::new();
1016        let config = IndexConfig {
1017            max_indexing_memory_bytes: 512,
1018            ..Default::default()
1019        };
1020
1021        // --- Round 1: 5 segments × 10 docs = 50 docs ---
1022        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1023            .await
1024            .unwrap();
1025
1026        for batch in 0..5 {
1027            for i in 0..10 {
1028                let mut doc = Document::new();
1029                doc.add_text(
1030                    title,
1031                    format!("alpha bravo charlie batch{} doc{}", batch, i),
1032                );
1033                doc.add_text(
1034                    body,
1035                    format!("the quick brown fox jumps over the lazy dog number {}", i),
1036                );
1037                writer.add_document(doc).unwrap();
1038            }
1039            writer.commit().await.unwrap();
1040        }
1041
1042        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1043        let pre_merge_segments = index.segment_readers().await.unwrap().len();
1044        assert!(
1045            pre_merge_segments >= 3,
1046            "Expected >=3 segments, got {}",
1047            pre_merge_segments
1048        );
1049        assert_eq!(index.num_docs().await.unwrap(), 50);
1050
1051        // Search before merge
1052        let results = index.query("alpha", 100).await.unwrap();
1053        assert_eq!(results.hits.len(), 50, "all 50 docs should match 'alpha'");
1054
1055        let results = index.query("fox", 100).await.unwrap();
1056        assert_eq!(results.hits.len(), 50, "all 50 docs should match 'fox'");
1057
1058        // --- Merge round 1 ---
1059        let mut writer = IndexWriter::open(dir.clone(), config.clone())
1060            .await
1061            .unwrap();
1062        writer.force_merge().await.unwrap();
1063
1064        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1065        assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1066        assert_eq!(index.num_docs().await.unwrap(), 50);
1067
1068        // Search after first merge
1069        let results = index.query("alpha", 100).await.unwrap();
1070        assert_eq!(
1071            results.hits.len(),
1072            50,
1073            "all 50 docs should match 'alpha' after merge 1"
1074        );
1075
1076        let results = index.query("fox", 100).await.unwrap();
1077        assert_eq!(
1078            results.hits.len(),
1079            50,
1080            "all 50 docs should match 'fox' after merge 1"
1081        );
1082
1083        // Verify all docs retrievable
1084        let reader1 = index.reader().await.unwrap();
1085        let searcher1 = reader1.searcher().await.unwrap();
1086        for i in 0..50 {
1087            let doc = searcher1.doc(i).await.unwrap();
1088            assert!(doc.is_some(), "doc {} should exist after merge 1", i);
1089        }
1090
1091        // --- Round 2: add 30 more docs in 3 segments ---
1092        let mut writer = IndexWriter::open(dir.clone(), config.clone())
1093            .await
1094            .unwrap();
1095        for batch in 0..3 {
1096            for i in 0..10 {
1097                let mut doc = Document::new();
1098                doc.add_text(
1099                    title,
1100                    format!("delta echo foxtrot round2_batch{} doc{}", batch, i),
1101                );
1102                doc.add_text(
1103                    body,
1104                    format!("the quick brown fox jumps again number {}", i),
1105                );
1106                writer.add_document(doc).unwrap();
1107            }
1108            writer.commit().await.unwrap();
1109        }
1110
1111        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1112        assert_eq!(index.num_docs().await.unwrap(), 80);
1113        assert!(
1114            index.segment_readers().await.unwrap().len() >= 2,
1115            "Should have >=2 segments after round 2 ingestion"
1116        );
1117
1118        // Search spans both old merged segment and new segments
1119        let results = index.query("fox", 100).await.unwrap();
1120        assert_eq!(results.hits.len(), 80, "all 80 docs should match 'fox'");
1121
1122        let results = index.query("alpha", 100).await.unwrap();
1123        assert_eq!(results.hits.len(), 50, "only round 1 docs match 'alpha'");
1124
1125        let results = index.query("delta", 100).await.unwrap();
1126        assert_eq!(results.hits.len(), 30, "only round 2 docs match 'delta'");
1127
1128        // --- Merge round 2 ---
1129        let mut writer = IndexWriter::open(dir.clone(), config.clone())
1130            .await
1131            .unwrap();
1132        writer.force_merge().await.unwrap();
1133
1134        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1135        assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1136        assert_eq!(index.num_docs().await.unwrap(), 80);
1137
1138        // All searches still correct after second merge
1139        let results = index.query("fox", 100).await.unwrap();
1140        assert_eq!(results.hits.len(), 80, "all 80 docs after merge 2");
1141
1142        let results = index.query("alpha", 100).await.unwrap();
1143        assert_eq!(results.hits.len(), 50, "round 1 docs after merge 2");
1144
1145        let results = index.query("delta", 100).await.unwrap();
1146        assert_eq!(results.hits.len(), 30, "round 2 docs after merge 2");
1147
1148        // Verify all 80 docs retrievable
1149        let reader2 = index.reader().await.unwrap();
1150        let searcher2 = reader2.searcher().await.unwrap();
1151        for i in 0..80 {
1152            let doc = searcher2.doc(i).await.unwrap();
1153            assert!(doc.is_some(), "doc {} should exist after merge 2", i);
1154        }
1155    }
1156
1157    /// Large-scale merge: many segments with overlapping terms, verifying
1158    /// BM25 scoring and doc retrieval after merge.
1159    #[tokio::test]
1160    async fn test_large_scale_merge_correctness() {
1161        let mut schema_builder = SchemaBuilder::default();
1162        let title = schema_builder.add_text_field("title", true, true);
1163        let schema = schema_builder.build();
1164
1165        let dir = RamDirectory::new();
1166        let config = IndexConfig {
1167            max_indexing_memory_bytes: 512,
1168            ..Default::default()
1169        };
1170
1171        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1172            .await
1173            .unwrap();
1174
1175        // 8 batches × 25 docs = 200 docs total
1176        // Terms: "common" appears in all, "unique_N" appears in batch N only
1177        let total_docs = 200u32;
1178        for batch in 0..8 {
1179            for i in 0..25 {
1180                let mut doc = Document::new();
1181                doc.add_text(
1182                    title,
1183                    format!("common shared term unique_{} item{}", batch, i),
1184                );
1185                writer.add_document(doc).unwrap();
1186            }
1187            writer.commit().await.unwrap();
1188        }
1189
1190        // Verify pre-merge
1191        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1192        assert_eq!(index.num_docs().await.unwrap(), total_docs);
1193
1194        let results = index.query("common", 300).await.unwrap();
1195        assert_eq!(
1196            results.hits.len(),
1197            total_docs as usize,
1198            "all docs should match 'common'"
1199        );
1200
1201        // Each unique_N matches exactly 25 docs
1202        for batch in 0..8 {
1203            let q = format!("unique_{}", batch);
1204            let results = index.query(&q, 100).await.unwrap();
1205            assert_eq!(results.hits.len(), 25, "'{}' should match 25 docs", q);
1206        }
1207
1208        // Force merge
1209        let mut writer = IndexWriter::open(dir.clone(), config.clone())
1210            .await
1211            .unwrap();
1212        writer.force_merge().await.unwrap();
1213
1214        // Verify post-merge: single segment, same results
1215        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1216        assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1217        assert_eq!(index.num_docs().await.unwrap(), total_docs);
1218
1219        let results = index.query("common", 300).await.unwrap();
1220        assert_eq!(results.hits.len(), total_docs as usize);
1221
1222        for batch in 0..8 {
1223            let q = format!("unique_{}", batch);
1224            let results = index.query(&q, 100).await.unwrap();
1225            assert_eq!(results.hits.len(), 25, "'{}' after merge", q);
1226        }
1227
1228        // Verify doc retrieval for every doc
1229        let reader = index.reader().await.unwrap();
1230        let searcher = reader.searcher().await.unwrap();
1231        for i in 0..total_docs {
1232            let doc = searcher.doc(i).await.unwrap();
1233            assert!(doc.is_some(), "doc {} missing after merge", i);
1234        }
1235    }
1236
1237    /// Test that auto-merge is triggered by the merge policy during commit,
1238    /// without calling force_merge. Uses MmapDirectory and higher parallelism
1239    /// to reproduce production conditions.
1240    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
1241    async fn test_auto_merge_triggered() {
1242        use crate::directories::MmapDirectory;
1243        let tmp_dir = tempfile::tempdir().unwrap();
1244        let dir = MmapDirectory::new(tmp_dir.path());
1245
1246        let mut schema_builder = SchemaBuilder::default();
1247        let title = schema_builder.add_text_field("title", true, true);
1248        let body = schema_builder.add_text_field("body", true, true);
1249        let schema = schema_builder.build();
1250
1251        // Aggressive policy: merge when 3 segments in same tier
1252        let config = IndexConfig {
1253            max_indexing_memory_bytes: 4096,
1254            num_indexing_threads: 4,
1255            merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1256            ..Default::default()
1257        };
1258
1259        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1260            .await
1261            .unwrap();
1262
1263        // Create 12 segments with ~50 docs each (4x the aggressive threshold of 3)
1264        for batch in 0..12 {
1265            for i in 0..50 {
1266                let mut doc = Document::new();
1267                doc.add_text(title, format!("document_{} batch_{} alpha bravo", i, batch));
1268                doc.add_text(
1269                    body,
1270                    format!(
1271                        "the quick brown fox jumps over lazy dog number {} round {}",
1272                        i, batch
1273                    ),
1274                );
1275                writer.add_document(doc).unwrap();
1276            }
1277            writer.commit().await.unwrap();
1278        }
1279
1280        let pre_merge = writer.segment_manager.get_segment_ids().await.len();
1281
1282        // wait_for_merging_thread waits for the single in-flight merge. After it completes,
1283        // re-evaluate since segments accumulated while the merge was running.
1284        writer.wait_for_merging_thread().await;
1285        writer.maybe_merge().await;
1286        writer.wait_for_merging_thread().await;
1287
1288        // After commit + auto-merge, segment count should be reduced
1289        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1290        let segment_count = index.segment_readers().await.unwrap().len();
1291        eprintln!(
1292            "Segments: {} before merge, {} after auto-merge",
1293            pre_merge, segment_count
1294        );
1295        assert!(
1296            segment_count < pre_merge,
1297            "Expected auto-merge to reduce segments from {}, got {}",
1298            pre_merge,
1299            segment_count
1300        );
1301    }
1302
1303    /// Regression test: commit with dense vector fields + aggressive merge policy.
1304    /// Exercises the race where background merge deletes segment files while
1305    /// maybe_build_vector_index → collect_vectors_for_training tries to open them.
1306    /// Before the fix, this would fail with "IO error: No such file or directory".
1307    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
1308    async fn test_commit_with_vectors_and_background_merge() {
1309        use crate::directories::MmapDirectory;
1310        use crate::dsl::DenseVectorConfig;
1311
1312        let tmp_dir = tempfile::tempdir().unwrap();
1313        let dir = MmapDirectory::new(tmp_dir.path());
1314
1315        let mut schema_builder = SchemaBuilder::default();
1316        let title = schema_builder.add_text_field("title", true, true);
1317        // RaBitQ with very low build_threshold so vector index building triggers during commit
1318        let vec_config = DenseVectorConfig::new(8).with_build_threshold(10);
1319        let embedding =
1320            schema_builder.add_dense_vector_field_with_config("embedding", true, true, vec_config);
1321        let schema = schema_builder.build();
1322
1323        // Aggressive merge: triggers background merges at 3 segments per tier
1324        let config = IndexConfig {
1325            max_indexing_memory_bytes: 4096,
1326            num_indexing_threads: 4,
1327            merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1328            ..Default::default()
1329        };
1330
1331        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1332            .await
1333            .unwrap();
1334
1335        // Create 12 segments with vectors — enough to trigger both
1336        // background merges (aggressive policy) and vector index building (threshold=10)
1337        for batch in 0..12 {
1338            for i in 0..5 {
1339                let mut doc = Document::new();
1340                doc.add_text(title, format!("doc_{}_batch_{}", i, batch));
1341                // 8-dim random vector
1342                let vec: Vec<f32> = (0..8).map(|j| (i * 8 + j + batch) as f32 * 0.1).collect();
1343                doc.add_dense_vector(embedding, vec);
1344                writer.add_document(doc).unwrap();
1345            }
1346            writer.commit().await.unwrap();
1347        }
1348        writer.wait_for_merging_thread().await;
1349
1350        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1351        let num_docs = index.num_docs().await.unwrap();
1352        assert_eq!(num_docs, 60, "Expected 60 docs, got {}", num_docs);
1353    }
1354
1355    /// Stress test: force_merge with many segments (iterative batching).
1356    /// Verifies that merging 50 segments doesn't OOM or exhaust file descriptors.
1357    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1358    async fn test_force_merge_many_segments() {
1359        use crate::directories::MmapDirectory;
1360        let tmp_dir = tempfile::tempdir().unwrap();
1361        let dir = MmapDirectory::new(tmp_dir.path());
1362
1363        let mut schema_builder = SchemaBuilder::default();
1364        let title = schema_builder.add_text_field("title", true, true);
1365        let schema = schema_builder.build();
1366
1367        let config = IndexConfig {
1368            max_indexing_memory_bytes: 512,
1369            ..Default::default()
1370        };
1371
1372        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1373            .await
1374            .unwrap();
1375
1376        // Create 50 tiny segments
1377        for batch in 0..50 {
1378            for i in 0..3 {
1379                let mut doc = Document::new();
1380                doc.add_text(title, format!("term_{} batch_{}", i, batch));
1381                writer.add_document(doc).unwrap();
1382            }
1383            writer.commit().await.unwrap();
1384        }
1385        // Wait for background merges before reading segment count
1386        writer.wait_for_merging_thread().await;
1387
1388        let seg_ids = writer.segment_manager.get_segment_ids().await;
1389        let pre = seg_ids.len();
1390        eprintln!("Segments before force_merge: {}", pre);
1391        assert!(pre >= 2, "Expected multiple segments, got {}", pre);
1392
1393        // Force merge all into one — should iterate in batches, not OOM
1394        writer.force_merge().await.unwrap();
1395
1396        let index2 = Index::open(dir, config).await.unwrap();
1397        let post = index2.segment_readers().await.unwrap().len();
1398        eprintln!("Segments after force_merge: {}", post);
1399        assert_eq!(post, 1);
1400        assert_eq!(index2.num_docs().await.unwrap(), 150);
1401    }
1402
1403    /// Test that background merges produce correct generation metadata.
1404    /// Creates many segments with aggressive policy, commits, waits for merges,
1405    /// and verifies that merged segments have generation >= 1 with correct ancestors.
1406    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1407    async fn test_background_merge_generation() {
1408        use crate::directories::MmapDirectory;
1409        let tmp_dir = tempfile::tempdir().unwrap();
1410        let dir = MmapDirectory::new(tmp_dir.path());
1411
1412        let mut schema_builder = SchemaBuilder::default();
1413        let title = schema_builder.add_text_field("title", true, true);
1414        let schema = schema_builder.build();
1415
1416        let config = IndexConfig {
1417            max_indexing_memory_bytes: 4096,
1418            num_indexing_threads: 2,
1419            merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1420            ..Default::default()
1421        };
1422
1423        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1424            .await
1425            .unwrap();
1426
1427        // Create 15 small segments — enough for aggressive policy to trigger merges
1428        for batch in 0..15 {
1429            for i in 0..5 {
1430                let mut doc = Document::new();
1431                doc.add_text(title, format!("doc_{}_batch_{}", i, batch));
1432                writer.add_document(doc).unwrap();
1433            }
1434            writer.commit().await.unwrap();
1435        }
1436        writer.wait_for_merging_thread().await;
1437
1438        // Read metadata and verify generation tracking
1439        let metas = writer
1440            .segment_manager
1441            .read_metadata(|m| m.segment_metas.clone())
1442            .await;
1443
1444        let max_gen = metas.values().map(|m| m.generation).max().unwrap_or(0);
1445        eprintln!(
1446            "Segments after merge: {}, max generation: {}",
1447            metas.len(),
1448            max_gen
1449        );
1450
1451        // Background merges should have produced at least one merged segment (gen >= 1)
1452        assert!(
1453            max_gen >= 1,
1454            "Expected at least one merged segment (gen >= 1), got max_gen={}",
1455            max_gen
1456        );
1457
1458        // Every merged segment (gen > 0) must have non-empty ancestors
1459        for (id, info) in &metas {
1460            if info.generation > 0 {
1461                assert!(
1462                    !info.ancestors.is_empty(),
1463                    "Segment {} has gen={} but no ancestors",
1464                    id,
1465                    info.generation
1466                );
1467            } else {
1468                assert!(
1469                    info.ancestors.is_empty(),
1470                    "Fresh segment {} has gen=0 but has ancestors",
1471                    id
1472                );
1473            }
1474        }
1475    }
1476
1477    /// Test that merging preserves every single document.
1478    /// Indexes 1000+ unique documents across many segments, force-merges,
1479    /// and verifies exact doc count and that every unique term is searchable.
1480    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1481    async fn test_merge_preserves_all_documents() {
1482        use crate::directories::MmapDirectory;
1483        let tmp_dir = tempfile::tempdir().unwrap();
1484        let dir = MmapDirectory::new(tmp_dir.path());
1485
1486        let mut schema_builder = SchemaBuilder::default();
1487        let title = schema_builder.add_text_field("title", true, true);
1488        let schema = schema_builder.build();
1489
1490        let config = IndexConfig {
1491            max_indexing_memory_bytes: 4096,
1492            ..Default::default()
1493        };
1494
1495        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1496            .await
1497            .unwrap();
1498
1499        let total_docs = 1200;
1500        let docs_per_batch = 60;
1501        let batches = total_docs / docs_per_batch;
1502
1503        // Each doc has a unique term "uid_N" for verification
1504        for batch in 0..batches {
1505            for i in 0..docs_per_batch {
1506                let doc_num = batch * docs_per_batch + i;
1507                let mut doc = Document::new();
1508                doc.add_text(
1509                    title,
1510                    format!("uid_{} common_term batch_{}", doc_num, batch),
1511                );
1512                writer.add_document(doc).unwrap();
1513            }
1514            writer.commit().await.unwrap();
1515        }
1516
1517        let pre_segments = writer.segment_manager.get_segment_ids().await.len();
1518        assert!(
1519            pre_segments >= 2,
1520            "Need multiple segments, got {}",
1521            pre_segments
1522        );
1523
1524        // Force merge to single segment
1525        writer.force_merge().await.unwrap();
1526
1527        let index = Index::open(dir, config).await.unwrap();
1528        assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1529        assert_eq!(
1530            index.num_docs().await.unwrap(),
1531            total_docs as u32,
1532            "Doc count mismatch after force_merge"
1533        );
1534
1535        // Verify every unique document is searchable
1536        let results = index.query("common_term", total_docs + 100).await.unwrap();
1537        assert_eq!(
1538            results.hits.len(),
1539            total_docs,
1540            "common_term should match all docs"
1541        );
1542
1543        // Spot-check unique IDs across the range
1544        for check in [0, 1, total_docs / 2, total_docs - 1] {
1545            let q = format!("uid_{}", check);
1546            let results = index.query(&q, 10).await.unwrap();
1547            assert_eq!(results.hits.len(), 1, "'{}' should match exactly 1 doc", q);
1548        }
1549    }
1550
1551    /// Multi-round commit+merge: verify doc count grows correctly
1552    /// and no documents are lost across multiple merge cycles.
1553    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1554    async fn test_multi_round_merge_doc_integrity() {
1555        use crate::directories::MmapDirectory;
1556        let tmp_dir = tempfile::tempdir().unwrap();
1557        let dir = MmapDirectory::new(tmp_dir.path());
1558
1559        let mut schema_builder = SchemaBuilder::default();
1560        let title = schema_builder.add_text_field("title", true, true);
1561        let schema = schema_builder.build();
1562
1563        let config = IndexConfig {
1564            max_indexing_memory_bytes: 4096,
1565            num_indexing_threads: 2,
1566            merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1567            ..Default::default()
1568        };
1569
1570        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1571            .await
1572            .unwrap();
1573
1574        let mut expected_total = 0u64;
1575
1576        // 4 rounds of: add docs → commit → wait for merges → verify count
1577        for round in 0..4 {
1578            let docs_this_round = 50 + round * 25; // 50, 75, 100, 125
1579            for batch in 0..5 {
1580                for i in 0..docs_this_round / 5 {
1581                    let mut doc = Document::new();
1582                    doc.add_text(
1583                        title,
1584                        format!("round_{}_batch_{}_doc_{} searchable", round, batch, i),
1585                    );
1586                    writer.add_document(doc).unwrap();
1587                }
1588                writer.commit().await.unwrap();
1589            }
1590            writer.wait_for_merging_thread().await;
1591
1592            expected_total += docs_this_round as u64;
1593
1594            let actual = writer
1595                .segment_manager
1596                .read_metadata(|m| {
1597                    m.segment_metas
1598                        .values()
1599                        .map(|s| s.num_docs as u64)
1600                        .sum::<u64>()
1601                })
1602                .await;
1603
1604            assert_eq!(
1605                actual, expected_total,
1606                "Round {}: expected {} docs, metadata reports {}",
1607                round, expected_total, actual
1608            );
1609        }
1610
1611        // Final verify: open fresh and query
1612        let index = Index::open(dir, config).await.unwrap();
1613        assert_eq!(index.num_docs().await.unwrap(), expected_total as u32);
1614
1615        let results = index
1616            .query("searchable", expected_total as usize + 100)
1617            .await
1618            .unwrap();
1619        assert_eq!(
1620            results.hits.len(),
1621            expected_total as usize,
1622            "All docs should match 'searchable'"
1623        );
1624
1625        // Check generation grew across rounds
1626        let metas = index
1627            .segment_manager()
1628            .read_metadata(|m| m.segment_metas.clone())
1629            .await;
1630        let max_gen = metas.values().map(|m| m.generation).max().unwrap_or(0);
1631        eprintln!(
1632            "Final: {} segments, {} docs, max generation={}",
1633            metas.len(),
1634            expected_total,
1635            max_gen
1636        );
1637        assert!(
1638            max_gen >= 1,
1639            "Multiple merge rounds should produce gen >= 1"
1640        );
1641    }
1642
1643    /// Sustained indexing: verify segment count stays O(logN) bounded.
1644    ///
1645    /// Indexes many small batches with aggressive merge policy and checks that
1646    /// the segment count never grows unbounded. With tiered merging the count
1647    /// should stay roughly O(segments_per_tier * num_tiers) ≈ O(log(N)).
1648    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1649    async fn test_segment_count_bounded_during_sustained_indexing() {
1650        use crate::directories::MmapDirectory;
1651        let tmp_dir = tempfile::tempdir().unwrap();
1652        let dir = MmapDirectory::new(tmp_dir.path());
1653
1654        let mut schema_builder = SchemaBuilder::default();
1655        let title = schema_builder.add_text_field("title", true, false);
1656        let schema = schema_builder.build();
1657
1658        let policy = crate::merge::TieredMergePolicy {
1659            segments_per_tier: 3,
1660            max_merge_at_once: 5,
1661            tier_factor: 10.0,
1662            tier_floor: 50,
1663            max_merged_docs: 1_000_000,
1664        };
1665
1666        let config = IndexConfig {
1667            max_indexing_memory_bytes: 4096, // tiny budget → frequent flushes
1668            num_indexing_threads: 1,
1669            merge_policy: Box::new(policy),
1670            max_concurrent_merges: 4,
1671            ..Default::default()
1672        };
1673
1674        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1675            .await
1676            .unwrap();
1677
1678        let num_commits = 40;
1679        let docs_per_commit = 30;
1680        let total_docs = num_commits * docs_per_commit;
1681        let mut max_segments_seen = 0usize;
1682
1683        for commit_idx in 0..num_commits {
1684            for i in 0..docs_per_commit {
1685                let mut doc = Document::new();
1686                doc.add_text(
1687                    title,
1688                    format!("doc_{} text", commit_idx * docs_per_commit + i),
1689                );
1690                writer.add_document(doc).unwrap();
1691            }
1692            writer.commit().await.unwrap();
1693
1694            // Give background merges a moment to run
1695            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1696
1697            let seg_count = writer.segment_manager.get_segment_ids().await.len();
1698            max_segments_seen = max_segments_seen.max(seg_count);
1699        }
1700
1701        // Wait for all merges to finish
1702        writer.wait_for_all_merges().await;
1703
1704        let final_segments = writer.segment_manager.get_segment_ids().await.len();
1705        let final_docs: u64 = writer
1706            .segment_manager
1707            .read_metadata(|m| {
1708                m.segment_metas
1709                    .values()
1710                    .map(|s| s.num_docs as u64)
1711                    .sum::<u64>()
1712            })
1713            .await;
1714
1715        eprintln!(
1716            "Sustained indexing: {} commits, {} total docs, final segments={}, max segments seen={}",
1717            num_commits, total_docs, final_segments, max_segments_seen
1718        );
1719
1720        // With 1200 docs and segments_per_tier=3, tier_floor=50:
1721        // tier 0: ≤50 docs, tier 1: 50-500, tier 2: 500-5000
1722        // We should have at most ~3 segments per tier * ~3 tiers ≈ 9-12 segments at peak.
1723        // The key invariant: segment count must NOT grow linearly with commits.
1724        // 40 commits should NOT produce 40 segments.
1725        let max_allowed = num_commits / 2; // generous: at most half the commits as segments
1726        assert!(
1727            max_segments_seen <= max_allowed,
1728            "Segment count grew too fast: max seen {} > allowed {} (out of {} commits). \
1729             Merging is not keeping up.",
1730            max_segments_seen,
1731            max_allowed,
1732            num_commits
1733        );
1734
1735        // After all merges complete, should be well under the limit
1736        assert!(
1737            final_segments <= 10,
1738            "After all merges, expected ≤10 segments, got {}",
1739            final_segments
1740        );
1741
1742        // No data loss
1743        assert_eq!(
1744            final_docs, total_docs as u64,
1745            "Expected {} docs, metadata reports {}",
1746            total_docs, final_docs
1747        );
1748    }
1749
1750    // ========================================================================
1751    // Needle-in-haystack comprehensive tests
1752    // ========================================================================
1753
1754    /// Full-text needle-in-haystack: one unique term among many documents.
1755    /// Verifies exact retrieval, scoring, and document content after commit + reopen.
1756    #[tokio::test]
1757    async fn test_needle_fulltext_single_segment() {
1758        let mut sb = SchemaBuilder::default();
1759        let title = sb.add_text_field("title", true, true);
1760        let body = sb.add_text_field("body", true, true);
1761        let schema = sb.build();
1762
1763        let dir = RamDirectory::new();
1764        let config = IndexConfig::default();
1765        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1766            .await
1767            .unwrap();
1768
1769        // 100 hay documents
1770        for i in 0..100 {
1771            let mut doc = Document::new();
1772            doc.add_text(title, format!("Hay document number {}", i));
1773            doc.add_text(
1774                body,
1775                "common words repeated across all hay documents filler text",
1776            );
1777            writer.add_document(doc).unwrap();
1778        }
1779
1780        // 1 needle document (doc 100)
1781        let mut needle = Document::new();
1782        needle.add_text(title, "The unique needle xylophone");
1783        needle.add_text(
1784            body,
1785            "This document contains the extraordinary term xylophone",
1786        );
1787        // Insert needle among hay by re-adding remaining hay after it
1788        // Actually, we already added 100, so needle is doc 100
1789        writer.add_document(needle).unwrap();
1790
1791        // 50 more hay documents after needle
1792        for i in 100..150 {
1793            let mut doc = Document::new();
1794            doc.add_text(title, format!("More hay document {}", i));
1795            doc.add_text(body, "common words filler text again and again");
1796            writer.add_document(doc).unwrap();
1797        }
1798
1799        writer.commit().await.unwrap();
1800
1801        let index = Index::open(dir, config).await.unwrap();
1802        assert_eq!(index.num_docs().await.unwrap(), 151);
1803
1804        // Search for the needle term
1805        let results = index.query("xylophone", 10).await.unwrap();
1806        assert_eq!(results.hits.len(), 1, "Should find exactly the needle");
1807        assert!(results.hits[0].score > 0.0, "Score should be positive");
1808
1809        // Verify document content
1810        let doc = index
1811            .get_document(&results.hits[0].address)
1812            .await
1813            .unwrap()
1814            .unwrap();
1815        let title_val = doc.get_first(title).unwrap().as_text().unwrap();
1816        assert!(
1817            title_val.contains("xylophone"),
1818            "Retrieved doc should be the needle"
1819        );
1820
1821        // Search for common term — should return many
1822        let results = index.query("common", 200).await.unwrap();
1823        assert!(
1824            results.hits.len() >= 100,
1825            "Common term should match many docs"
1826        );
1827
1828        // Negative test — term that doesn't exist
1829        let results = index.query("nonexistentterm99999", 10).await.unwrap();
1830        assert_eq!(
1831            results.hits.len(),
1832            0,
1833            "Non-existent term should match nothing"
1834        );
1835    }
1836
1837    /// Full-text needle across multiple segments: ensures cross-segment search works.
1838    #[tokio::test]
1839    async fn test_needle_fulltext_multi_segment() {
1840        use crate::query::TermQuery;
1841
1842        let mut sb = SchemaBuilder::default();
1843        let content = sb.add_text_field("content", true, true);
1844        let schema = sb.build();
1845
1846        let dir = RamDirectory::new();
1847        let config = IndexConfig::default();
1848        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1849            .await
1850            .unwrap();
1851
1852        // Segment 1: 50 hay docs
1853        for i in 0..50 {
1854            let mut doc = Document::new();
1855            doc.add_text(content, format!("segment one hay document {}", i));
1856            writer.add_document(doc).unwrap();
1857        }
1858        writer.commit().await.unwrap();
1859
1860        // Segment 2: needle + 49 hay docs
1861        let mut needle = Document::new();
1862        needle.add_text(content, "the magnificent quetzalcoatl serpent deity");
1863        writer.add_document(needle).unwrap();
1864        for i in 0..49 {
1865            let mut doc = Document::new();
1866            doc.add_text(content, format!("segment two hay document {}", i));
1867            writer.add_document(doc).unwrap();
1868        }
1869        writer.commit().await.unwrap();
1870
1871        // Segment 3: 50 more hay docs
1872        for i in 0..50 {
1873            let mut doc = Document::new();
1874            doc.add_text(content, format!("segment three hay document {}", i));
1875            writer.add_document(doc).unwrap();
1876        }
1877        writer.commit().await.unwrap();
1878
1879        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1880        assert_eq!(index.num_docs().await.unwrap(), 150);
1881        let num_segments = index.segment_readers().await.unwrap().len();
1882        assert!(
1883            num_segments >= 2,
1884            "Should have multiple segments, got {}",
1885            num_segments
1886        );
1887
1888        // Find needle across segments
1889        let results = index.query("quetzalcoatl", 10).await.unwrap();
1890        assert_eq!(
1891            results.hits.len(),
1892            1,
1893            "Should find exactly 1 needle across segments"
1894        );
1895
1896        // Verify using TermQuery directly
1897        let reader = index.reader().await.unwrap();
1898        let searcher = reader.searcher().await.unwrap();
1899        let tq = TermQuery::text(content, "quetzalcoatl");
1900        let results = searcher.search(&tq, 10).await.unwrap();
1901        assert_eq!(results.len(), 1, "TermQuery should also find the needle");
1902
1903        // Verify content
1904        let doc = searcher.doc(results[0].doc_id).await.unwrap().unwrap();
1905        let text = doc.get_first(content).unwrap().as_text().unwrap();
1906        assert!(
1907            text.contains("quetzalcoatl"),
1908            "Should retrieve needle content"
1909        );
1910
1911        // Cross-segment term that exists in all segments
1912        let results = index.query("document", 200).await.unwrap();
1913        assert!(
1914            results.hits.len() >= 149,
1915            "Should find hay docs across all segments"
1916        );
1917    }
1918
1919    /// Sparse vector needle-in-haystack: one document with unique dimensions.
1920    #[tokio::test]
1921    async fn test_needle_sparse_vector() {
1922        use crate::query::SparseVectorQuery;
1923
1924        let mut sb = SchemaBuilder::default();
1925        let title = sb.add_text_field("title", true, true);
1926        let sparse = sb.add_sparse_vector_field("sparse", true, true);
1927        let schema = sb.build();
1928
1929        let dir = RamDirectory::new();
1930        let config = IndexConfig::default();
1931        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1932            .await
1933            .unwrap();
1934
1935        // 100 hay documents with sparse vectors on dimensions 0-9
1936        for i in 0..100 {
1937            let mut doc = Document::new();
1938            doc.add_text(title, format!("Hay sparse doc {}", i));
1939            // All hay docs share dimensions 0-9 with varying weights
1940            let entries: Vec<(u32, f32)> = (0..10)
1941                .map(|d| (d, 0.1 + (i as f32 * 0.001) + (d as f32 * 0.01)))
1942                .collect();
1943            doc.add_sparse_vector(sparse, entries);
1944            writer.add_document(doc).unwrap();
1945        }
1946
1947        // Needle: unique dimensions 1000, 1001, 1002 (no other doc has these)
1948        let mut needle = Document::new();
1949        needle.add_text(title, "Needle sparse document");
1950        needle.add_sparse_vector(
1951            sparse,
1952            vec![(1000, 0.9), (1001, 0.8), (1002, 0.7), (5, 0.3)],
1953        );
1954        writer.add_document(needle).unwrap();
1955
1956        // 50 more hay docs
1957        for i in 100..150 {
1958            let mut doc = Document::new();
1959            doc.add_text(title, format!("More hay sparse doc {}", i));
1960            let entries: Vec<(u32, f32)> = (0..10).map(|d| (d, 0.2 + (d as f32 * 0.02))).collect();
1961            doc.add_sparse_vector(sparse, entries);
1962            writer.add_document(doc).unwrap();
1963        }
1964
1965        writer.commit().await.unwrap();
1966
1967        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1968        assert_eq!(index.num_docs().await.unwrap(), 151);
1969
1970        // Query with needle's unique dimensions
1971        let reader = index.reader().await.unwrap();
1972        let searcher = reader.searcher().await.unwrap();
1973        let query = SparseVectorQuery::new(sparse, vec![(1000, 1.0), (1001, 1.0), (1002, 1.0)]);
1974        let results = searcher.search(&query, 10).await.unwrap();
1975        assert_eq!(results.len(), 1, "Only needle has dims 1000-1002");
1976        assert!(results[0].score > 0.0, "Needle score should be positive");
1977
1978        // Verify it's the right document
1979        let doc = searcher.doc(results[0].doc_id).await.unwrap().unwrap();
1980        let title_val = doc.get_first(title).unwrap().as_text().unwrap();
1981        assert_eq!(title_val, "Needle sparse document");
1982
1983        // Query with shared dimension — should match many
1984        let query_shared = SparseVectorQuery::new(sparse, vec![(5, 1.0)]);
1985        let results = searcher.search(&query_shared, 200).await.unwrap();
1986        assert!(
1987            results.len() >= 100,
1988            "Shared dim 5 should match many docs, got {}",
1989            results.len()
1990        );
1991
1992        // Query with non-existent dimension — should match nothing
1993        let query_missing = SparseVectorQuery::new(sparse, vec![(99999, 1.0)]);
1994        let results = searcher.search(&query_missing, 10).await.unwrap();
1995        assert_eq!(
1996            results.len(),
1997            0,
1998            "Non-existent dimension should match nothing"
1999        );
2000    }
2001
2002    /// Sparse vector needle across multiple segments with merge.
2003    #[tokio::test]
2004    async fn test_needle_sparse_vector_multi_segment_merge() {
2005        use crate::query::SparseVectorQuery;
2006
2007        let mut sb = SchemaBuilder::default();
2008        let title = sb.add_text_field("title", true, true);
2009        let sparse = sb.add_sparse_vector_field("sparse", true, true);
2010        let schema = sb.build();
2011
2012        let dir = RamDirectory::new();
2013        let config = IndexConfig::default();
2014        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
2015            .await
2016            .unwrap();
2017
2018        // Segment 1: hay
2019        for i in 0..30 {
2020            let mut doc = Document::new();
2021            doc.add_text(title, format!("seg1 hay {}", i));
2022            doc.add_sparse_vector(sparse, vec![(0, 0.5), (1, 0.3)]);
2023            writer.add_document(doc).unwrap();
2024        }
2025        writer.commit().await.unwrap();
2026
2027        // Segment 2: needle + hay
2028        let mut needle = Document::new();
2029        needle.add_text(title, "seg2 needle");
2030        needle.add_sparse_vector(sparse, vec![(500, 0.95), (501, 0.85)]);
2031        writer.add_document(needle).unwrap();
2032        for i in 0..29 {
2033            let mut doc = Document::new();
2034            doc.add_text(title, format!("seg2 hay {}", i));
2035            doc.add_sparse_vector(sparse, vec![(0, 0.4), (2, 0.6)]);
2036            writer.add_document(doc).unwrap();
2037        }
2038        writer.commit().await.unwrap();
2039
2040        // Verify pre-merge
2041        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
2042        assert_eq!(index.num_docs().await.unwrap(), 60);
2043
2044        let reader = index.reader().await.unwrap();
2045        let searcher = reader.searcher().await.unwrap();
2046        let query = SparseVectorQuery::new(sparse, vec![(500, 1.0), (501, 1.0)]);
2047        let results = searcher.search(&query, 10).await.unwrap();
2048        assert_eq!(results.len(), 1, "Pre-merge: needle should be found");
2049        let doc = searcher.doc(results[0].doc_id).await.unwrap().unwrap();
2050        assert_eq!(
2051            doc.get_first(title).unwrap().as_text().unwrap(),
2052            "seg2 needle"
2053        );
2054
2055        // Force merge
2056        let mut writer = IndexWriter::open(dir.clone(), config.clone())
2057            .await
2058            .unwrap();
2059        writer.force_merge().await.unwrap();
2060
2061        // Verify post-merge
2062        let index = Index::open(dir, config).await.unwrap();
2063        assert_eq!(index.segment_readers().await.unwrap().len(), 1);
2064        assert_eq!(index.num_docs().await.unwrap(), 60);
2065
2066        let reader = index.reader().await.unwrap();
2067        let searcher = reader.searcher().await.unwrap();
2068        let query = SparseVectorQuery::new(sparse, vec![(500, 1.0), (501, 1.0)]);
2069        let results = searcher.search(&query, 10).await.unwrap();
2070        assert_eq!(results.len(), 1, "Post-merge: needle should still be found");
2071        let doc = searcher.doc(results[0].doc_id).await.unwrap().unwrap();
2072        assert_eq!(
2073            doc.get_first(title).unwrap().as_text().unwrap(),
2074            "seg2 needle"
2075        );
2076    }
2077
2078    /// Dense vector needle-in-haystack using brute-force (Flat) search.
2079    #[tokio::test]
2080    async fn test_needle_dense_vector_flat() {
2081        use crate::dsl::{DenseVectorConfig, VectorIndexType};
2082        use crate::query::DenseVectorQuery;
2083
2084        let dim = 16;
2085        let mut sb = SchemaBuilder::default();
2086        let title = sb.add_text_field("title", true, true);
2087        let embedding = sb.add_dense_vector_field_with_config(
2088            "embedding",
2089            true,
2090            true,
2091            DenseVectorConfig {
2092                dim,
2093                index_type: VectorIndexType::Flat,
2094                quantization: crate::dsl::DenseVectorQuantization::F32,
2095                num_clusters: None,
2096                nprobe: 0,
2097                build_threshold: None,
2098            },
2099        );
2100        let schema = sb.build();
2101
2102        let dir = RamDirectory::new();
2103        let config = IndexConfig::default();
2104        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
2105            .await
2106            .unwrap();
2107
2108        // 100 hay docs: vectors near origin (small random-ish values)
2109        for i in 0..100 {
2110            let mut doc = Document::new();
2111            doc.add_text(title, format!("Hay dense doc {}", i));
2112            // Hay vectors: low-magnitude, varying direction
2113            let vec: Vec<f32> = (0..dim)
2114                .map(|d| ((i * 7 + d * 13) % 100) as f32 / 1000.0)
2115                .collect();
2116            doc.add_dense_vector(embedding, vec);
2117            writer.add_document(doc).unwrap();
2118        }
2119
2120        // Needle: vector pointing strongly in one direction [1,1,1,...,1]
2121        let mut needle = Document::new();
2122        needle.add_text(title, "Needle dense document");
2123        let needle_vec: Vec<f32> = vec![1.0; dim];
2124        needle.add_dense_vector(embedding, needle_vec.clone());
2125        writer.add_document(needle).unwrap();
2126
2127        writer.commit().await.unwrap();
2128
2129        let index = Index::open(dir, config).await.unwrap();
2130        assert_eq!(index.num_docs().await.unwrap(), 101);
2131
2132        // Query with the needle vector — it should be the top result
2133        let reader = index.reader().await.unwrap();
2134        let searcher = reader.searcher().await.unwrap();
2135        let query = DenseVectorQuery::new(embedding, needle_vec);
2136        let results = searcher.search(&query, 5).await.unwrap();
2137        assert!(!results.is_empty(), "Should find at least 1 result");
2138
2139        // The needle (exact match) should be the top result with highest score
2140        let top_doc = searcher.doc(results[0].doc_id).await.unwrap().unwrap();
2141        let top_title = top_doc.get_first(title).unwrap().as_text().unwrap();
2142        assert_eq!(
2143            top_title, "Needle dense document",
2144            "Top result should be the needle (exact vector match)"
2145        );
2146        assert!(
2147            results[0].score > 0.9,
2148            "Exact match should have very high cosine similarity, got {}",
2149            results[0].score
2150        );
2151    }
2152
2153    /// Combined: full-text + sparse + dense in the same index.
2154    /// Verifies all three retrieval paths work independently on the same dataset.
2155    #[tokio::test]
2156    async fn test_needle_combined_all_modalities() {
2157        use crate::dsl::{DenseVectorConfig, VectorIndexType};
2158        use crate::query::{DenseVectorQuery, SparseVectorQuery, TermQuery};
2159
2160        let dim = 8;
2161        let mut sb = SchemaBuilder::default();
2162        let title = sb.add_text_field("title", true, true);
2163        let body = sb.add_text_field("body", true, true);
2164        let sparse = sb.add_sparse_vector_field("sparse", true, true);
2165        let embedding = sb.add_dense_vector_field_with_config(
2166            "embedding",
2167            true,
2168            true,
2169            DenseVectorConfig {
2170                dim,
2171                index_type: VectorIndexType::Flat,
2172                quantization: crate::dsl::DenseVectorQuantization::F32,
2173                num_clusters: None,
2174                nprobe: 0,
2175                build_threshold: None,
2176            },
2177        );
2178        let schema = sb.build();
2179
2180        let dir = RamDirectory::new();
2181        let config = IndexConfig::default();
2182        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
2183            .await
2184            .unwrap();
2185
2186        // 80 hay docs with all three modalities
2187        for i in 0..80u32 {
2188            let mut doc = Document::new();
2189            doc.add_text(title, format!("Hay doc {}", i));
2190            doc.add_text(body, "general filler text about nothing special");
2191            doc.add_sparse_vector(sparse, vec![(0, 0.3), (1, 0.2), ((i % 10) + 10, 0.5)]);
2192            let vec: Vec<f32> = (0..dim)
2193                .map(|d| ((i as usize * 3 + d * 7) % 50) as f32 / 100.0)
2194                .collect();
2195            doc.add_dense_vector(embedding, vec);
2196            writer.add_document(doc).unwrap();
2197        }
2198
2199        // Needle doc: unique in ALL three modalities
2200        let mut needle = Document::new();
2201        needle.add_text(title, "The extraordinary rhinoceros");
2202        needle.add_text(
2203            body,
2204            "This document about rhinoceros is the only one with this word",
2205        );
2206        needle.add_sparse_vector(sparse, vec![(9999, 0.99), (9998, 0.88)]);
2207        let needle_vec = vec![0.9; dim];
2208        needle.add_dense_vector(embedding, needle_vec.clone());
2209        writer.add_document(needle).unwrap();
2210
2211        writer.commit().await.unwrap();
2212
2213        let index = Index::open(dir, config).await.unwrap();
2214        assert_eq!(index.num_docs().await.unwrap(), 81);
2215
2216        let reader = index.reader().await.unwrap();
2217        let searcher = reader.searcher().await.unwrap();
2218
2219        // --- Full-text needle ---
2220        let tq = TermQuery::text(body, "rhinoceros");
2221        let results = searcher.search(&tq, 10).await.unwrap();
2222        assert_eq!(
2223            results.len(),
2224            1,
2225            "Full-text: should find exactly the needle"
2226        );
2227        let doc = searcher.doc(results[0].doc_id).await.unwrap().unwrap();
2228        assert!(
2229            doc.get_first(title)
2230                .unwrap()
2231                .as_text()
2232                .unwrap()
2233                .contains("rhinoceros")
2234        );
2235
2236        // --- Sparse vector needle ---
2237        let sq = SparseVectorQuery::new(sparse, vec![(9999, 1.0), (9998, 1.0)]);
2238        let results = searcher.search(&sq, 10).await.unwrap();
2239        assert_eq!(results.len(), 1, "Sparse: should find exactly the needle");
2240        let doc = searcher.doc(results[0].doc_id).await.unwrap().unwrap();
2241        assert!(
2242            doc.get_first(title)
2243                .unwrap()
2244                .as_text()
2245                .unwrap()
2246                .contains("rhinoceros")
2247        );
2248
2249        // --- Dense vector needle ---
2250        let dq = DenseVectorQuery::new(embedding, needle_vec);
2251        let results = searcher.search(&dq, 1).await.unwrap();
2252        assert!(!results.is_empty(), "Dense: should find at least 1 result");
2253        let doc = searcher.doc(results[0].doc_id).await.unwrap().unwrap();
2254        assert_eq!(
2255            doc.get_first(title).unwrap().as_text().unwrap(),
2256            "The extraordinary rhinoceros",
2257            "Dense: top-1 should be the needle"
2258        );
2259
2260        // Verify all three found the same document
2261        let ft_doc_id = {
2262            let tq = TermQuery::text(body, "rhinoceros");
2263            let r = searcher.search(&tq, 1).await.unwrap();
2264            r[0].doc_id
2265        };
2266        let sp_doc_id = {
2267            let sq = SparseVectorQuery::new(sparse, vec![(9999, 1.0)]);
2268            let r = searcher.search(&sq, 1).await.unwrap();
2269            r[0].doc_id
2270        };
2271        let dn_doc_id = {
2272            let dq = DenseVectorQuery::new(embedding, vec![0.9; dim]);
2273            let r = searcher.search(&dq, 1).await.unwrap();
2274            r[0].doc_id
2275        };
2276
2277        assert_eq!(
2278            ft_doc_id, sp_doc_id,
2279            "Full-text and sparse should find same doc"
2280        );
2281        assert_eq!(
2282            sp_doc_id, dn_doc_id,
2283            "Sparse and dense should find same doc"
2284        );
2285    }
2286
2287    /// Stress test: many needles scattered across segments, verify ALL are found.
2288    #[tokio::test]
2289    async fn test_many_needles_all_found() {
2290        let mut sb = SchemaBuilder::default();
2291        let content = sb.add_text_field("content", true, true);
2292        let schema = sb.build();
2293
2294        let dir = RamDirectory::new();
2295        let config = IndexConfig::default();
2296        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
2297            .await
2298            .unwrap();
2299
2300        let num_needles = 20usize;
2301        let hay_per_batch = 50usize;
2302        let needle_terms: Vec<String> = (0..num_needles)
2303            .map(|i| format!("uniqueneedle{:04}", i))
2304            .collect();
2305
2306        // Interleave needles with hay across commits
2307        for batch in 0..4 {
2308            // Hay
2309            for i in 0..hay_per_batch {
2310                let mut doc = Document::new();
2311                doc.add_text(
2312                    content,
2313                    format!("hay batch {} item {} common filler", batch, i),
2314                );
2315                writer.add_document(doc).unwrap();
2316            }
2317            // 5 needles per batch
2318            for n in 0..5 {
2319                let needle_idx = batch * 5 + n;
2320                let mut doc = Document::new();
2321                doc.add_text(
2322                    content,
2323                    format!("this is {} among many documents", needle_terms[needle_idx]),
2324                );
2325                writer.add_document(doc).unwrap();
2326            }
2327            writer.commit().await.unwrap();
2328        }
2329
2330        let index = Index::open(dir, config).await.unwrap();
2331        let total = index.num_docs().await.unwrap();
2332        assert_eq!(total, (hay_per_batch * 4 + num_needles) as u32);
2333
2334        // Find EVERY needle
2335        for term in &needle_terms {
2336            let results = index.query(term, 10).await.unwrap();
2337            assert_eq!(
2338                results.hits.len(),
2339                1,
2340                "Should find exactly 1 doc for needle '{}'",
2341                term
2342            );
2343        }
2344
2345        // Verify hay term matches all hay docs
2346        let results = index.query("common", 500).await.unwrap();
2347        assert_eq!(
2348            results.hits.len(),
2349            hay_per_batch * 4,
2350            "Common term should match all {} hay docs",
2351            hay_per_batch * 4
2352        );
2353    }
2354}