Skip to main content

hermes_core/index/
mod.rs

1//! Index - multi-segment async search index
2//!
3//! The `Index` is the central concept that provides:
4//! - `Index::create()` / `Index::open()` - create or open an index
5//! - `index.writer()` - get an IndexWriter for adding documents
6//! - `index.reader()` - get an IndexReader for searching (with reload policy)
7//!
8//! The Index owns the SegmentManager which handles segment lifecycle and tracking.
9
10#[cfg(feature = "native")]
11use crate::dsl::Schema;
12#[cfg(feature = "native")]
13use crate::error::Result;
14#[cfg(feature = "native")]
15use std::sync::Arc;
16
17mod searcher;
18pub use searcher::Searcher;
19
20#[cfg(feature = "native")]
21mod reader;
22#[cfg(feature = "native")]
23mod vector_builder;
24#[cfg(feature = "native")]
25mod writer;
26#[cfg(feature = "native")]
27pub use reader::IndexReader;
28#[cfg(feature = "native")]
29pub use writer::{IndexWriter, PreparedCommit};
30
31mod metadata;
32pub use metadata::{FieldVectorMeta, INDEX_META_FILENAME, IndexMetadata, VectorIndexState};
33
34#[cfg(feature = "native")]
35mod helpers;
36#[cfg(feature = "native")]
37pub use helpers::{
38    IndexingStats, SchemaConfig, SchemaFieldConfig, create_index_at_path, create_index_from_sdl,
39    index_documents_from_reader, index_json_document, parse_schema,
40};
41
42/// Default file name for the slice cache
43pub const SLICE_CACHE_FILENAME: &str = "index.slicecache";
44
45/// Index configuration
46#[derive(Debug, Clone)]
47pub struct IndexConfig {
48    /// Number of threads for CPU-intensive tasks (search parallelism)
49    pub num_threads: usize,
50    /// Number of parallel segment builders (documents distributed round-robin)
51    pub num_indexing_threads: usize,
52    /// Number of threads for parallel block compression within each segment
53    pub num_compression_threads: usize,
54    /// Block cache size for term dictionary per segment
55    pub term_cache_blocks: usize,
56    /// Block cache size for document store per segment
57    pub store_cache_blocks: usize,
58    /// Max memory (bytes) across all builders before auto-commit (global limit)
59    pub max_indexing_memory_bytes: usize,
60    /// Merge policy for background segment merging
61    pub merge_policy: Box<dyn crate::merge::MergePolicy>,
62    /// Index optimization mode (adaptive, size-optimized, performance-optimized)
63    pub optimization: crate::structures::IndexOptimization,
64    /// Reload interval in milliseconds for IndexReader (how often to check for new segments)
65    pub reload_interval_ms: u64,
66    /// Maximum number of concurrent background merges (default: 4)
67    pub max_concurrent_merges: usize,
68}
69
70impl Default for IndexConfig {
71    fn default() -> Self {
72        #[cfg(feature = "native")]
73        let indexing_threads = crate::default_indexing_threads();
74        #[cfg(not(feature = "native"))]
75        let indexing_threads = 1;
76
77        #[cfg(feature = "native")]
78        let compression_threads = crate::default_compression_threads();
79        #[cfg(not(feature = "native"))]
80        let compression_threads = 1;
81
82        Self {
83            num_threads: indexing_threads,
84            num_indexing_threads: 1, // Increase to 2+ for production to avoid stalls during segment build
85            num_compression_threads: compression_threads,
86            term_cache_blocks: 256,
87            store_cache_blocks: 32,
88            max_indexing_memory_bytes: 256 * 1024 * 1024, // 256 MB default
89            merge_policy: Box::new(crate::merge::TieredMergePolicy::default()),
90            optimization: crate::structures::IndexOptimization::default(),
91            reload_interval_ms: 1000, // 1 second default
92            max_concurrent_merges: 4,
93        }
94    }
95}
96
97/// Multi-segment async Index
98///
99/// The central concept for search. Owns segment lifecycle and provides:
100/// - `Index::create()` / `Index::open()` - create or open an index
101/// - `index.writer()` - get an IndexWriter for adding documents
102/// - `index.reader()` - get an IndexReader for searching with reload policy
103///
104/// All segment management is delegated to SegmentManager.
105#[cfg(feature = "native")]
106pub struct Index<D: crate::directories::DirectoryWriter + 'static> {
107    directory: Arc<D>,
108    schema: Arc<Schema>,
109    config: IndexConfig,
110    /// Segment manager - owns segments, tracker, metadata, and trained structures
111    segment_manager: Arc<crate::merge::SegmentManager<D>>,
112    /// Cached reader (created lazily, reused across calls)
113    cached_reader: tokio::sync::OnceCell<IndexReader<D>>,
114}
115
116#[cfg(feature = "native")]
117impl<D: crate::directories::DirectoryWriter + 'static> Index<D> {
118    /// Create a new index in the directory
119    pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
120        let directory = Arc::new(directory);
121        let schema = Arc::new(schema);
122        let metadata = IndexMetadata::new((*schema).clone());
123
124        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
125            Arc::clone(&directory),
126            Arc::clone(&schema),
127            metadata,
128            config.merge_policy.clone_box(),
129            config.term_cache_blocks,
130            config.max_concurrent_merges,
131        ));
132
133        // Save initial metadata
134        segment_manager.update_metadata(|_| {}).await?;
135
136        Ok(Self {
137            directory,
138            schema,
139            config,
140            segment_manager,
141            cached_reader: tokio::sync::OnceCell::new(),
142        })
143    }
144
145    /// Open an existing index from a directory
146    pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
147        let directory = Arc::new(directory);
148
149        // Load metadata (includes schema)
150        let metadata = IndexMetadata::load(directory.as_ref()).await?;
151        let schema = Arc::new(metadata.schema.clone());
152
153        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
154            Arc::clone(&directory),
155            Arc::clone(&schema),
156            metadata,
157            config.merge_policy.clone_box(),
158            config.term_cache_blocks,
159            config.max_concurrent_merges,
160        ));
161
162        // Load trained structures into SegmentManager's ArcSwap
163        segment_manager.load_and_publish_trained().await;
164
165        Ok(Self {
166            directory,
167            schema,
168            config,
169            segment_manager,
170            cached_reader: tokio::sync::OnceCell::new(),
171        })
172    }
173
174    /// Get the schema
175    pub fn schema(&self) -> &Schema {
176        &self.schema
177    }
178
179    /// Get the schema as an Arc reference (avoids clone when Arc is needed)
180    pub fn schema_arc(&self) -> &Arc<Schema> {
181        &self.schema
182    }
183
184    /// Get a reference to the underlying directory
185    pub fn directory(&self) -> &D {
186        &self.directory
187    }
188
189    /// Get the segment manager
190    pub fn segment_manager(&self) -> &Arc<crate::merge::SegmentManager<D>> {
191        &self.segment_manager
192    }
193
194    /// Get an IndexReader for searching (with reload policy)
195    ///
196    /// The reader is cached and reused across calls. The reader's internal
197    /// searcher will reload segments based on its reload interval (configurable via IndexConfig).
198    pub async fn reader(&self) -> Result<&IndexReader<D>> {
199        self.cached_reader
200            .get_or_try_init(|| async {
201                IndexReader::from_segment_manager(
202                    Arc::clone(&self.schema),
203                    Arc::clone(&self.segment_manager),
204                    self.config.term_cache_blocks,
205                    self.config.reload_interval_ms,
206                )
207                .await
208            })
209            .await
210    }
211
212    /// Get the config
213    pub fn config(&self) -> &IndexConfig {
214        &self.config
215    }
216
217    /// Get segment readers for query execution (convenience method)
218    pub async fn segment_readers(&self) -> Result<Vec<Arc<crate::segment::SegmentReader>>> {
219        let reader = self.reader().await?;
220        let searcher = reader.searcher().await?;
221        Ok(searcher.segment_readers().to_vec())
222    }
223
224    /// Total number of documents across all segments
225    pub async fn num_docs(&self) -> Result<u32> {
226        let reader = self.reader().await?;
227        let searcher = reader.searcher().await?;
228        Ok(searcher.num_docs())
229    }
230
231    /// Get default fields for search
232    pub fn default_fields(&self) -> Vec<crate::Field> {
233        if !self.schema.default_fields().is_empty() {
234            self.schema.default_fields().to_vec()
235        } else {
236            self.schema
237                .fields()
238                .filter(|(_, entry)| {
239                    entry.indexed && entry.field_type == crate::dsl::FieldType::Text
240                })
241                .map(|(field, _)| field)
242                .collect()
243        }
244    }
245
246    /// Get tokenizer registry
247    pub fn tokenizers(&self) -> Arc<crate::tokenizer::TokenizerRegistry> {
248        Arc::new(crate::tokenizer::TokenizerRegistry::default())
249    }
250
251    /// Create a query parser for this index
252    pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
253        let default_fields = self.default_fields();
254        let tokenizers = self.tokenizers();
255
256        let query_routers = self.schema.query_routers();
257        if !query_routers.is_empty()
258            && let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers)
259        {
260            return crate::dsl::QueryLanguageParser::with_router(
261                Arc::clone(&self.schema),
262                default_fields,
263                tokenizers,
264                router,
265            );
266        }
267
268        crate::dsl::QueryLanguageParser::new(Arc::clone(&self.schema), default_fields, tokenizers)
269    }
270
271    /// Parse and search using a query string
272    pub async fn query(
273        &self,
274        query_str: &str,
275        limit: usize,
276    ) -> Result<crate::query::SearchResponse> {
277        self.query_offset(query_str, limit, 0).await
278    }
279
280    /// Query with offset for pagination
281    pub async fn query_offset(
282        &self,
283        query_str: &str,
284        limit: usize,
285        offset: usize,
286    ) -> Result<crate::query::SearchResponse> {
287        let parser = self.query_parser();
288        let query = parser
289            .parse(query_str)
290            .map_err(crate::error::Error::Query)?;
291        self.search_offset(query.as_ref(), limit, offset).await
292    }
293
294    /// Search and return results
295    pub async fn search(
296        &self,
297        query: &dyn crate::query::Query,
298        limit: usize,
299    ) -> Result<crate::query::SearchResponse> {
300        self.search_offset(query, limit, 0).await
301    }
302
303    /// Search with offset for pagination
304    pub async fn search_offset(
305        &self,
306        query: &dyn crate::query::Query,
307        limit: usize,
308        offset: usize,
309    ) -> Result<crate::query::SearchResponse> {
310        let reader = self.reader().await?;
311        let searcher = reader.searcher().await?;
312        let segments = searcher.segment_readers();
313
314        let fetch_limit = offset + limit;
315
316        let futures: Vec<_> = segments
317            .iter()
318            .map(|segment| {
319                let sid = segment.meta().id;
320                async move {
321                    let results =
322                        crate::query::search_segment(segment.as_ref(), query, fetch_limit).await?;
323                    Ok::<_, crate::error::Error>(
324                        results
325                            .into_iter()
326                            .map(move |r| (sid, r))
327                            .collect::<Vec<_>>(),
328                    )
329                }
330            })
331            .collect();
332
333        let batches = futures::future::try_join_all(futures).await?;
334        let mut all_results: Vec<(u128, crate::query::SearchResult)> =
335            Vec::with_capacity(batches.iter().map(|b| b.len()).sum());
336        for batch in batches {
337            all_results.extend(batch);
338        }
339
340        all_results.sort_by(|a, b| {
341            b.1.score
342                .partial_cmp(&a.1.score)
343                .unwrap_or(std::cmp::Ordering::Equal)
344        });
345
346        let total_hits = all_results.len() as u32;
347
348        let hits: Vec<crate::query::SearchHit> = all_results
349            .into_iter()
350            .skip(offset)
351            .take(limit)
352            .map(|(segment_id, result)| crate::query::SearchHit {
353                address: crate::query::DocAddress::new(segment_id, result.doc_id),
354                score: result.score,
355                matched_fields: result.extract_ordinals(),
356            })
357            .collect();
358
359        Ok(crate::query::SearchResponse { hits, total_hits })
360    }
361
362    /// Get a document by its unique address
363    pub async fn get_document(
364        &self,
365        address: &crate::query::DocAddress,
366    ) -> Result<Option<crate::dsl::Document>> {
367        let reader = self.reader().await?;
368        let searcher = reader.searcher().await?;
369        searcher.get_document(address).await
370    }
371
372    /// Get posting lists for a term across all segments
373    pub async fn get_postings(
374        &self,
375        field: crate::Field,
376        term: &[u8],
377    ) -> Result<
378        Vec<(
379            Arc<crate::segment::SegmentReader>,
380            crate::structures::BlockPostingList,
381        )>,
382    > {
383        let segments = self.segment_readers().await?;
384        let mut results = Vec::new();
385
386        for segment in segments {
387            if let Some(postings) = segment.get_postings(field, term).await? {
388                results.push((segment, postings));
389            }
390        }
391
392        Ok(results)
393    }
394}
395
396/// Native-only methods for Index
397#[cfg(feature = "native")]
398impl<D: crate::directories::DirectoryWriter + 'static> Index<D> {
399    /// Get an IndexWriter for adding documents
400    pub fn writer(&self) -> writer::IndexWriter<D> {
401        writer::IndexWriter::from_index(self)
402    }
403}
404
405#[cfg(test)]
406mod tests {
407    use super::*;
408    use crate::directories::RamDirectory;
409    use crate::dsl::{Document, SchemaBuilder};
410
411    #[tokio::test]
412    async fn test_index_create_and_search() {
413        let mut schema_builder = SchemaBuilder::default();
414        let title = schema_builder.add_text_field("title", true, true);
415        let body = schema_builder.add_text_field("body", true, true);
416        let schema = schema_builder.build();
417
418        let dir = RamDirectory::new();
419        let config = IndexConfig::default();
420
421        // Create index and add documents
422        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
423            .await
424            .unwrap();
425
426        let mut doc1 = Document::new();
427        doc1.add_text(title, "Hello World");
428        doc1.add_text(body, "This is the first document");
429        writer.add_document(doc1).unwrap();
430
431        let mut doc2 = Document::new();
432        doc2.add_text(title, "Goodbye World");
433        doc2.add_text(body, "This is the second document");
434        writer.add_document(doc2).unwrap();
435
436        writer.commit().await.unwrap();
437
438        // Open for reading
439        let index = Index::open(dir, config).await.unwrap();
440        assert_eq!(index.num_docs().await.unwrap(), 2);
441
442        // Check postings
443        let postings = index.get_postings(title, b"world").await.unwrap();
444        assert_eq!(postings.len(), 1); // One segment
445        assert_eq!(postings[0].1.doc_count(), 2); // Two docs with "world"
446
447        // Retrieve document via searcher snapshot
448        let reader = index.reader().await.unwrap();
449        let searcher = reader.searcher().await.unwrap();
450        let doc = searcher.doc(0).await.unwrap().unwrap();
451        assert_eq!(doc.get_first(title).unwrap().as_text(), Some("Hello World"));
452    }
453
454    #[tokio::test]
455    async fn test_multiple_segments() {
456        let mut schema_builder = SchemaBuilder::default();
457        let title = schema_builder.add_text_field("title", true, true);
458        let schema = schema_builder.build();
459
460        let dir = RamDirectory::new();
461        let config = IndexConfig {
462            max_indexing_memory_bytes: 1024, // Very small to trigger frequent flushes
463            ..Default::default()
464        };
465
466        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
467            .await
468            .unwrap();
469
470        // Add documents in batches to create multiple segments
471        for batch in 0..3 {
472            for i in 0..5 {
473                let mut doc = Document::new();
474                doc.add_text(title, format!("Document {} batch {}", i, batch));
475                writer.add_document(doc).unwrap();
476            }
477            writer.commit().await.unwrap();
478        }
479
480        // Open and check
481        let index = Index::open(dir, config).await.unwrap();
482        assert_eq!(index.num_docs().await.unwrap(), 15);
483        // With queue-based indexing, exact segment count varies
484        assert!(
485            index.segment_readers().await.unwrap().len() >= 2,
486            "Expected multiple segments"
487        );
488    }
489
490    #[tokio::test]
491    async fn test_segment_merge() {
492        let mut schema_builder = SchemaBuilder::default();
493        let title = schema_builder.add_text_field("title", true, true);
494        let schema = schema_builder.build();
495
496        let dir = RamDirectory::new();
497        let config = IndexConfig {
498            max_indexing_memory_bytes: 512, // Very small to trigger frequent flushes
499            ..Default::default()
500        };
501
502        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
503            .await
504            .unwrap();
505
506        // Create multiple segments by flushing between batches
507        for batch in 0..3 {
508            for i in 0..3 {
509                let mut doc = Document::new();
510                doc.add_text(title, format!("Document {} batch {}", i, batch));
511                writer.add_document(doc).unwrap();
512            }
513            writer.commit().await.unwrap();
514        }
515
516        // Should have multiple segments (at least 2, one per flush with docs)
517        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
518        assert!(
519            index.segment_readers().await.unwrap().len() >= 2,
520            "Expected multiple segments"
521        );
522
523        // Force merge
524        let mut writer = IndexWriter::open(dir.clone(), config.clone())
525            .await
526            .unwrap();
527        writer.force_merge().await.unwrap();
528
529        // Should have 1 segment now
530        let index = Index::open(dir, config).await.unwrap();
531        assert_eq!(index.segment_readers().await.unwrap().len(), 1);
532        assert_eq!(index.num_docs().await.unwrap(), 9);
533
534        // Verify all documents accessible (order may vary with queue-based indexing)
535        let reader = index.reader().await.unwrap();
536        let searcher = reader.searcher().await.unwrap();
537        let mut found_docs = 0;
538        for i in 0..9 {
539            if searcher.doc(i).await.unwrap().is_some() {
540                found_docs += 1;
541            }
542        }
543        assert_eq!(found_docs, 9);
544    }
545
546    #[tokio::test]
547    async fn test_match_query() {
548        let mut schema_builder = SchemaBuilder::default();
549        let title = schema_builder.add_text_field("title", true, true);
550        let body = schema_builder.add_text_field("body", true, true);
551        let schema = schema_builder.build();
552
553        let dir = RamDirectory::new();
554        let config = IndexConfig::default();
555
556        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
557            .await
558            .unwrap();
559
560        let mut doc1 = Document::new();
561        doc1.add_text(title, "rust programming");
562        doc1.add_text(body, "Learn rust language");
563        writer.add_document(doc1).unwrap();
564
565        let mut doc2 = Document::new();
566        doc2.add_text(title, "python programming");
567        doc2.add_text(body, "Learn python language");
568        writer.add_document(doc2).unwrap();
569
570        writer.commit().await.unwrap();
571
572        let index = Index::open(dir, config).await.unwrap();
573
574        // Test match query with multiple default fields
575        let results = index.query("rust", 10).await.unwrap();
576        assert_eq!(results.hits.len(), 1);
577
578        // Test match query with multiple tokens
579        let results = index.query("rust programming", 10).await.unwrap();
580        assert!(!results.hits.is_empty());
581
582        // Verify hit has address (segment_id + doc_id)
583        let hit = &results.hits[0];
584        assert!(!hit.address.segment_id.is_empty(), "Should have segment_id");
585
586        // Verify document retrieval by address
587        let doc = index.get_document(&hit.address).await.unwrap().unwrap();
588        assert!(
589            !doc.field_values().is_empty(),
590            "Doc should have field values"
591        );
592
593        // Also verify doc retrieval via searcher snapshot
594        let reader = index.reader().await.unwrap();
595        let searcher = reader.searcher().await.unwrap();
596        let doc = searcher.doc(0).await.unwrap().unwrap();
597        assert!(
598            !doc.field_values().is_empty(),
599            "Doc should have field values"
600        );
601    }
602
603    #[tokio::test]
604    async fn test_slice_cache_warmup_and_load() {
605        use crate::directories::SliceCachingDirectory;
606
607        let mut schema_builder = SchemaBuilder::default();
608        let title = schema_builder.add_text_field("title", true, true);
609        let body = schema_builder.add_text_field("body", true, true);
610        let schema = schema_builder.build();
611
612        let dir = RamDirectory::new();
613        let config = IndexConfig::default();
614
615        // Create index with some documents
616        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
617            .await
618            .unwrap();
619
620        for i in 0..10 {
621            let mut doc = Document::new();
622            doc.add_text(title, format!("Document {} about rust", i));
623            doc.add_text(body, format!("This is body text number {}", i));
624            writer.add_document(doc).unwrap();
625        }
626        writer.commit().await.unwrap();
627
628        // Open with slice caching and perform some operations to warm up cache
629        let caching_dir = SliceCachingDirectory::new(dir.clone(), 1024 * 1024);
630        let index = Index::open(caching_dir, config.clone()).await.unwrap();
631
632        // Perform a search to warm up the cache
633        let results = index.query("rust", 10).await.unwrap();
634        assert!(!results.hits.is_empty());
635
636        // Check cache stats - should have cached some data
637        let stats = index.directory.stats();
638        assert!(stats.total_bytes > 0, "Cache should have data after search");
639    }
640
641    #[tokio::test]
642    async fn test_multivalue_field_indexing_and_search() {
643        let mut schema_builder = SchemaBuilder::default();
644        let uris = schema_builder.add_text_field("uris", true, true);
645        let title = schema_builder.add_text_field("title", true, true);
646        let schema = schema_builder.build();
647
648        let dir = RamDirectory::new();
649        let config = IndexConfig::default();
650
651        // Create index and add document with multi-value field
652        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
653            .await
654            .unwrap();
655
656        let mut doc = Document::new();
657        doc.add_text(uris, "one");
658        doc.add_text(uris, "two");
659        doc.add_text(title, "Test Document");
660        writer.add_document(doc).unwrap();
661
662        // Add another document with different uris
663        let mut doc2 = Document::new();
664        doc2.add_text(uris, "three");
665        doc2.add_text(title, "Another Document");
666        writer.add_document(doc2).unwrap();
667
668        writer.commit().await.unwrap();
669
670        // Open for reading
671        let index = Index::open(dir, config).await.unwrap();
672        assert_eq!(index.num_docs().await.unwrap(), 2);
673
674        // Verify document retrieval preserves all values
675        let reader = index.reader().await.unwrap();
676        let searcher = reader.searcher().await.unwrap();
677        let doc = searcher.doc(0).await.unwrap().unwrap();
678        let all_uris: Vec<_> = doc.get_all(uris).collect();
679        assert_eq!(all_uris.len(), 2, "Should have 2 uris values");
680        assert_eq!(all_uris[0].as_text(), Some("one"));
681        assert_eq!(all_uris[1].as_text(), Some("two"));
682
683        // Verify to_json returns array for multi-value field
684        let json = doc.to_json(index.schema());
685        let uris_json = json.get("uris").unwrap();
686        assert!(uris_json.is_array(), "Multi-value field should be an array");
687        let uris_arr = uris_json.as_array().unwrap();
688        assert_eq!(uris_arr.len(), 2);
689        assert_eq!(uris_arr[0].as_str(), Some("one"));
690        assert_eq!(uris_arr[1].as_str(), Some("two"));
691
692        // Verify both values are searchable
693        let results = index.query("uris:one", 10).await.unwrap();
694        assert_eq!(results.hits.len(), 1, "Should find doc with 'one'");
695        assert_eq!(results.hits[0].address.doc_id, 0);
696
697        let results = index.query("uris:two", 10).await.unwrap();
698        assert_eq!(results.hits.len(), 1, "Should find doc with 'two'");
699        assert_eq!(results.hits[0].address.doc_id, 0);
700
701        let results = index.query("uris:three", 10).await.unwrap();
702        assert_eq!(results.hits.len(), 1, "Should find doc with 'three'");
703        assert_eq!(results.hits[0].address.doc_id, 1);
704
705        // Verify searching for non-existent value returns no results
706        let results = index.query("uris:nonexistent", 10).await.unwrap();
707        assert_eq!(results.hits.len(), 0, "Should not find non-existent value");
708    }
709
710    /// Comprehensive test for WAND optimization in BooleanQuery OR queries
711    ///
712    /// This test verifies that:
713    /// 1. BooleanQuery with multiple SHOULD term queries uses WAND automatically
714    /// 2. Search results are correct regardless of WAND optimization
715    /// 3. Scores are reasonable for matching documents
716    #[tokio::test]
717    async fn test_wand_optimization_for_or_queries() {
718        use crate::query::{BooleanQuery, TermQuery};
719
720        let mut schema_builder = SchemaBuilder::default();
721        let content = schema_builder.add_text_field("content", true, true);
722        let schema = schema_builder.build();
723
724        let dir = RamDirectory::new();
725        let config = IndexConfig::default();
726
727        // Create index with documents containing various terms
728        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
729            .await
730            .unwrap();
731
732        // Doc 0: contains "rust" and "programming"
733        let mut doc = Document::new();
734        doc.add_text(content, "rust programming language is fast");
735        writer.add_document(doc).unwrap();
736
737        // Doc 1: contains "rust" only
738        let mut doc = Document::new();
739        doc.add_text(content, "rust is a systems language");
740        writer.add_document(doc).unwrap();
741
742        // Doc 2: contains "programming" only
743        let mut doc = Document::new();
744        doc.add_text(content, "programming is fun");
745        writer.add_document(doc).unwrap();
746
747        // Doc 3: contains "python" (neither rust nor programming)
748        let mut doc = Document::new();
749        doc.add_text(content, "python is easy to learn");
750        writer.add_document(doc).unwrap();
751
752        // Doc 4: contains both "rust" and "programming" multiple times
753        let mut doc = Document::new();
754        doc.add_text(content, "rust rust programming programming systems");
755        writer.add_document(doc).unwrap();
756
757        writer.commit().await.unwrap();
758
759        // Open for reading
760        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
761
762        // Test 1: Pure OR query with multiple terms (should use WAND automatically)
763        let or_query = BooleanQuery::new()
764            .should(TermQuery::text(content, "rust"))
765            .should(TermQuery::text(content, "programming"));
766
767        let results = index.search(&or_query, 10).await.unwrap();
768
769        // Should find docs 0, 1, 2, 4 (all that contain "rust" OR "programming")
770        assert_eq!(results.hits.len(), 4, "Should find exactly 4 documents");
771
772        let doc_ids: Vec<u32> = results.hits.iter().map(|h| h.address.doc_id).collect();
773        assert!(doc_ids.contains(&0), "Should find doc 0");
774        assert!(doc_ids.contains(&1), "Should find doc 1");
775        assert!(doc_ids.contains(&2), "Should find doc 2");
776        assert!(doc_ids.contains(&4), "Should find doc 4");
777        assert!(
778            !doc_ids.contains(&3),
779            "Should NOT find doc 3 (only has 'python')"
780        );
781
782        // Test 2: Single term query (should NOT use WAND, but still work)
783        let single_query = BooleanQuery::new().should(TermQuery::text(content, "rust"));
784
785        let results = index.search(&single_query, 10).await.unwrap();
786        assert_eq!(results.hits.len(), 3, "Should find 3 documents with 'rust'");
787
788        // Test 3: Query with MUST (should NOT use WAND)
789        let must_query = BooleanQuery::new()
790            .must(TermQuery::text(content, "rust"))
791            .should(TermQuery::text(content, "programming"));
792
793        let results = index.search(&must_query, 10).await.unwrap();
794        // Must have "rust", optionally "programming"
795        assert_eq!(results.hits.len(), 3, "Should find 3 documents with 'rust'");
796
797        // Test 4: Query with MUST_NOT (should NOT use WAND)
798        let must_not_query = BooleanQuery::new()
799            .should(TermQuery::text(content, "rust"))
800            .should(TermQuery::text(content, "programming"))
801            .must_not(TermQuery::text(content, "systems"));
802
803        let results = index.search(&must_not_query, 10).await.unwrap();
804        // Should exclude docs with "systems" (doc 1 and 4)
805        let doc_ids: Vec<u32> = results.hits.iter().map(|h| h.address.doc_id).collect();
806        assert!(
807            !doc_ids.contains(&1),
808            "Should NOT find doc 1 (has 'systems')"
809        );
810        assert!(
811            !doc_ids.contains(&4),
812            "Should NOT find doc 4 (has 'systems')"
813        );
814
815        // Test 5: Verify top-k limit works correctly with WAND
816        let or_query = BooleanQuery::new()
817            .should(TermQuery::text(content, "rust"))
818            .should(TermQuery::text(content, "programming"));
819
820        let results = index.search(&or_query, 2).await.unwrap();
821        assert_eq!(results.hits.len(), 2, "Should return only top 2 results");
822
823        // Top results should be docs that match both terms (higher scores)
824        // Doc 0 and 4 contain both "rust" and "programming"
825    }
826
827    /// Test that WAND optimization produces same results as non-WAND for correctness
828    #[tokio::test]
829    async fn test_wand_results_match_standard_boolean() {
830        use crate::query::{BooleanQuery, TermQuery, WandOrQuery};
831
832        let mut schema_builder = SchemaBuilder::default();
833        let content = schema_builder.add_text_field("content", true, true);
834        let schema = schema_builder.build();
835
836        let dir = RamDirectory::new();
837        let config = IndexConfig::default();
838
839        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
840            .await
841            .unwrap();
842
843        // Add several documents
844        for i in 0..10 {
845            let mut doc = Document::new();
846            let text = match i % 4 {
847                0 => "apple banana cherry",
848                1 => "apple orange",
849                2 => "banana grape",
850                _ => "cherry date",
851            };
852            doc.add_text(content, text);
853            writer.add_document(doc).unwrap();
854        }
855
856        writer.commit().await.unwrap();
857        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
858
859        // Compare explicit WandOrQuery with auto-optimized BooleanQuery
860        let wand_query = WandOrQuery::new(content).term("apple").term("banana");
861
862        let bool_query = BooleanQuery::new()
863            .should(TermQuery::text(content, "apple"))
864            .should(TermQuery::text(content, "banana"));
865
866        let wand_results = index.search(&wand_query, 10).await.unwrap();
867        let bool_results = index.search(&bool_query, 10).await.unwrap();
868
869        // Both should find the same documents
870        assert_eq!(
871            wand_results.hits.len(),
872            bool_results.hits.len(),
873            "WAND and Boolean should find same number of docs"
874        );
875
876        let wand_docs: std::collections::HashSet<u32> =
877            wand_results.hits.iter().map(|h| h.address.doc_id).collect();
878        let bool_docs: std::collections::HashSet<u32> =
879            bool_results.hits.iter().map(|h| h.address.doc_id).collect();
880
881        assert_eq!(
882            wand_docs, bool_docs,
883            "WAND and Boolean should find same documents"
884        );
885    }
886
887    #[tokio::test]
888    async fn test_vector_index_threshold_switch() {
889        use crate::dsl::{DenseVectorConfig, DenseVectorQuantization, VectorIndexType};
890
891        // Create schema with dense vector field configured for IVF-RaBitQ
892        let mut schema_builder = SchemaBuilder::default();
893        let title = schema_builder.add_text_field("title", true, true);
894        let embedding = schema_builder.add_dense_vector_field_with_config(
895            "embedding",
896            true, // indexed
897            true, // stored
898            DenseVectorConfig {
899                dim: 8,
900                index_type: VectorIndexType::IvfRaBitQ,
901                quantization: DenseVectorQuantization::F32,
902                num_clusters: Some(4), // Small for test
903                nprobe: 2,
904                build_threshold: Some(50), // Build when we have 50+ vectors
905                unit_norm: false,
906            },
907        );
908        let schema = schema_builder.build();
909
910        let dir = RamDirectory::new();
911        let config = IndexConfig::default();
912
913        // Phase 1: Add vectors below threshold (should use Flat index)
914        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
915            .await
916            .unwrap();
917
918        // Add 30 documents (below threshold of 50)
919        for i in 0..30 {
920            let mut doc = Document::new();
921            doc.add_text(title, format!("Document {}", i));
922            // Simple embedding: [i, i, i, i, i, i, i, i] normalized
923            let vec: Vec<f32> = (0..8).map(|_| (i as f32) / 30.0).collect();
924            doc.add_dense_vector(embedding, vec);
925            writer.add_document(doc).unwrap();
926        }
927        writer.commit().await.unwrap();
928
929        // Open index and verify it's using Flat (not built yet)
930        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
931        assert!(
932            index.segment_manager.trained().is_none(),
933            "Should not have trained centroids below threshold"
934        );
935
936        // Search should work with Flat index
937        let query_vec: Vec<f32> = vec![0.5; 8];
938        let segments = index.segment_readers().await.unwrap();
939        assert!(!segments.is_empty());
940
941        let results = segments[0]
942            .search_dense_vector(
943                embedding,
944                &query_vec,
945                5,
946                0,
947                1,
948                crate::query::MultiValueCombiner::Max,
949            )
950            .await
951            .unwrap();
952        assert!(!results.is_empty(), "Flat search should return results");
953
954        // Phase 2: Add more vectors to cross threshold
955        let mut writer = IndexWriter::open(dir.clone(), config.clone())
956            .await
957            .unwrap();
958
959        // Add 30 more documents (total 60, above threshold of 50)
960        for i in 30..60 {
961            let mut doc = Document::new();
962            doc.add_text(title, format!("Document {}", i));
963            let vec: Vec<f32> = (0..8).map(|_| (i as f32) / 60.0).collect();
964            doc.add_dense_vector(embedding, vec);
965            writer.add_document(doc).unwrap();
966        }
967        writer.commit().await.unwrap();
968
969        // Manually trigger vector index build (no longer auto-triggered by commit)
970        writer.build_vector_index().await.unwrap();
971
972        // Reopen index and verify trained structures are loaded
973        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
974        assert!(
975            index.segment_manager.trained().is_some(),
976            "Should have loaded trained centroids for embedding field"
977        );
978
979        // Search should still work
980        let segments = index.segment_readers().await.unwrap();
981        let results = segments[0]
982            .search_dense_vector(
983                embedding,
984                &query_vec,
985                5,
986                0,
987                1,
988                crate::query::MultiValueCombiner::Max,
989            )
990            .await
991            .unwrap();
992        assert!(
993            !results.is_empty(),
994            "Search should return results after build"
995        );
996
997        // Phase 3: Verify calling build_vector_index again is a no-op
998        let writer = IndexWriter::open(dir.clone(), config.clone())
999            .await
1000            .unwrap();
1001        writer.build_vector_index().await.unwrap(); // Should skip training
1002
1003        // Still built (trained structures present in ArcSwap)
1004        assert!(writer.segment_manager.trained().is_some());
1005    }
1006
1007    /// Multi-round merge: flush many small segments, merge, add more, merge again.
1008    /// Verifies search correctness (term + phrase queries) through multiple merge rounds.
1009    #[tokio::test]
1010    async fn test_multi_round_merge_with_search() {
1011        let mut schema_builder = SchemaBuilder::default();
1012        let title = schema_builder.add_text_field("title", true, true);
1013        let body = schema_builder.add_text_field("body", true, true);
1014        let schema = schema_builder.build();
1015
1016        let dir = RamDirectory::new();
1017        let config = IndexConfig {
1018            max_indexing_memory_bytes: 512,
1019            ..Default::default()
1020        };
1021
1022        // --- Round 1: 5 segments × 10 docs = 50 docs ---
1023        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1024            .await
1025            .unwrap();
1026
1027        for batch in 0..5 {
1028            for i in 0..10 {
1029                let mut doc = Document::new();
1030                doc.add_text(
1031                    title,
1032                    format!("alpha bravo charlie batch{} doc{}", batch, i),
1033                );
1034                doc.add_text(
1035                    body,
1036                    format!("the quick brown fox jumps over the lazy dog number {}", i),
1037                );
1038                writer.add_document(doc).unwrap();
1039            }
1040            writer.commit().await.unwrap();
1041        }
1042
1043        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1044        let pre_merge_segments = index.segment_readers().await.unwrap().len();
1045        assert!(
1046            pre_merge_segments >= 3,
1047            "Expected >=3 segments, got {}",
1048            pre_merge_segments
1049        );
1050        assert_eq!(index.num_docs().await.unwrap(), 50);
1051
1052        // Search before merge
1053        let results = index.query("alpha", 100).await.unwrap();
1054        assert_eq!(results.hits.len(), 50, "all 50 docs should match 'alpha'");
1055
1056        let results = index.query("fox", 100).await.unwrap();
1057        assert_eq!(results.hits.len(), 50, "all 50 docs should match 'fox'");
1058
1059        // --- Merge round 1 ---
1060        let mut writer = IndexWriter::open(dir.clone(), config.clone())
1061            .await
1062            .unwrap();
1063        writer.force_merge().await.unwrap();
1064
1065        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1066        assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1067        assert_eq!(index.num_docs().await.unwrap(), 50);
1068
1069        // Search after first merge
1070        let results = index.query("alpha", 100).await.unwrap();
1071        assert_eq!(
1072            results.hits.len(),
1073            50,
1074            "all 50 docs should match 'alpha' after merge 1"
1075        );
1076
1077        let results = index.query("fox", 100).await.unwrap();
1078        assert_eq!(
1079            results.hits.len(),
1080            50,
1081            "all 50 docs should match 'fox' after merge 1"
1082        );
1083
1084        // Verify all docs retrievable
1085        let reader1 = index.reader().await.unwrap();
1086        let searcher1 = reader1.searcher().await.unwrap();
1087        for i in 0..50 {
1088            let doc = searcher1.doc(i).await.unwrap();
1089            assert!(doc.is_some(), "doc {} should exist after merge 1", i);
1090        }
1091
1092        // --- Round 2: add 30 more docs in 3 segments ---
1093        let mut writer = IndexWriter::open(dir.clone(), config.clone())
1094            .await
1095            .unwrap();
1096        for batch in 0..3 {
1097            for i in 0..10 {
1098                let mut doc = Document::new();
1099                doc.add_text(
1100                    title,
1101                    format!("delta echo foxtrot round2_batch{} doc{}", batch, i),
1102                );
1103                doc.add_text(
1104                    body,
1105                    format!("the quick brown fox jumps again number {}", i),
1106                );
1107                writer.add_document(doc).unwrap();
1108            }
1109            writer.commit().await.unwrap();
1110        }
1111
1112        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1113        assert_eq!(index.num_docs().await.unwrap(), 80);
1114        assert!(
1115            index.segment_readers().await.unwrap().len() >= 2,
1116            "Should have >=2 segments after round 2 ingestion"
1117        );
1118
1119        // Search spans both old merged segment and new segments
1120        let results = index.query("fox", 100).await.unwrap();
1121        assert_eq!(results.hits.len(), 80, "all 80 docs should match 'fox'");
1122
1123        let results = index.query("alpha", 100).await.unwrap();
1124        assert_eq!(results.hits.len(), 50, "only round 1 docs match 'alpha'");
1125
1126        let results = index.query("delta", 100).await.unwrap();
1127        assert_eq!(results.hits.len(), 30, "only round 2 docs match 'delta'");
1128
1129        // --- Merge round 2 ---
1130        let mut writer = IndexWriter::open(dir.clone(), config.clone())
1131            .await
1132            .unwrap();
1133        writer.force_merge().await.unwrap();
1134
1135        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1136        assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1137        assert_eq!(index.num_docs().await.unwrap(), 80);
1138
1139        // All searches still correct after second merge
1140        let results = index.query("fox", 100).await.unwrap();
1141        assert_eq!(results.hits.len(), 80, "all 80 docs after merge 2");
1142
1143        let results = index.query("alpha", 100).await.unwrap();
1144        assert_eq!(results.hits.len(), 50, "round 1 docs after merge 2");
1145
1146        let results = index.query("delta", 100).await.unwrap();
1147        assert_eq!(results.hits.len(), 30, "round 2 docs after merge 2");
1148
1149        // Verify all 80 docs retrievable
1150        let reader2 = index.reader().await.unwrap();
1151        let searcher2 = reader2.searcher().await.unwrap();
1152        for i in 0..80 {
1153            let doc = searcher2.doc(i).await.unwrap();
1154            assert!(doc.is_some(), "doc {} should exist after merge 2", i);
1155        }
1156    }
1157
1158    /// Large-scale merge: many segments with overlapping terms, verifying
1159    /// BM25 scoring and doc retrieval after merge.
1160    #[tokio::test]
1161    async fn test_large_scale_merge_correctness() {
1162        let mut schema_builder = SchemaBuilder::default();
1163        let title = schema_builder.add_text_field("title", true, true);
1164        let schema = schema_builder.build();
1165
1166        let dir = RamDirectory::new();
1167        let config = IndexConfig {
1168            max_indexing_memory_bytes: 512,
1169            ..Default::default()
1170        };
1171
1172        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1173            .await
1174            .unwrap();
1175
1176        // 8 batches × 25 docs = 200 docs total
1177        // Terms: "common" appears in all, "unique_N" appears in batch N only
1178        let total_docs = 200u32;
1179        for batch in 0..8 {
1180            for i in 0..25 {
1181                let mut doc = Document::new();
1182                doc.add_text(
1183                    title,
1184                    format!("common shared term unique_{} item{}", batch, i),
1185                );
1186                writer.add_document(doc).unwrap();
1187            }
1188            writer.commit().await.unwrap();
1189        }
1190
1191        // Verify pre-merge
1192        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1193        assert_eq!(index.num_docs().await.unwrap(), total_docs);
1194
1195        let results = index.query("common", 300).await.unwrap();
1196        assert_eq!(
1197            results.hits.len(),
1198            total_docs as usize,
1199            "all docs should match 'common'"
1200        );
1201
1202        // Each unique_N matches exactly 25 docs
1203        for batch in 0..8 {
1204            let q = format!("unique_{}", batch);
1205            let results = index.query(&q, 100).await.unwrap();
1206            assert_eq!(results.hits.len(), 25, "'{}' should match 25 docs", q);
1207        }
1208
1209        // Force merge
1210        let mut writer = IndexWriter::open(dir.clone(), config.clone())
1211            .await
1212            .unwrap();
1213        writer.force_merge().await.unwrap();
1214
1215        // Verify post-merge: single segment, same results
1216        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1217        assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1218        assert_eq!(index.num_docs().await.unwrap(), total_docs);
1219
1220        let results = index.query("common", 300).await.unwrap();
1221        assert_eq!(results.hits.len(), total_docs as usize);
1222
1223        for batch in 0..8 {
1224            let q = format!("unique_{}", batch);
1225            let results = index.query(&q, 100).await.unwrap();
1226            assert_eq!(results.hits.len(), 25, "'{}' after merge", q);
1227        }
1228
1229        // Verify doc retrieval for every doc
1230        let reader = index.reader().await.unwrap();
1231        let searcher = reader.searcher().await.unwrap();
1232        for i in 0..total_docs {
1233            let doc = searcher.doc(i).await.unwrap();
1234            assert!(doc.is_some(), "doc {} missing after merge", i);
1235        }
1236    }
1237
1238    /// Test that auto-merge is triggered by the merge policy during commit,
1239    /// without calling force_merge. Uses MmapDirectory and higher parallelism
1240    /// to reproduce production conditions.
1241    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
1242    async fn test_auto_merge_triggered() {
1243        use crate::directories::MmapDirectory;
1244        let tmp_dir = tempfile::tempdir().unwrap();
1245        let dir = MmapDirectory::new(tmp_dir.path());
1246
1247        let mut schema_builder = SchemaBuilder::default();
1248        let title = schema_builder.add_text_field("title", true, true);
1249        let body = schema_builder.add_text_field("body", true, true);
1250        let schema = schema_builder.build();
1251
1252        // Aggressive policy: merge when 3 segments in same tier
1253        let config = IndexConfig {
1254            max_indexing_memory_bytes: 4096,
1255            num_indexing_threads: 4,
1256            merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1257            ..Default::default()
1258        };
1259
1260        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1261            .await
1262            .unwrap();
1263
1264        // Create 12 segments with ~50 docs each (4x the aggressive threshold of 3)
1265        for batch in 0..12 {
1266            for i in 0..50 {
1267                let mut doc = Document::new();
1268                doc.add_text(title, format!("document_{} batch_{} alpha bravo", i, batch));
1269                doc.add_text(
1270                    body,
1271                    format!(
1272                        "the quick brown fox jumps over lazy dog number {} round {}",
1273                        i, batch
1274                    ),
1275                );
1276                writer.add_document(doc).unwrap();
1277            }
1278            writer.commit().await.unwrap();
1279        }
1280
1281        let pre_merge = writer.segment_manager.get_segment_ids().await.len();
1282
1283        // wait_for_merging_thread waits for the single in-flight merge. After it completes,
1284        // re-evaluate since segments accumulated while the merge was running.
1285        writer.wait_for_merging_thread().await;
1286        writer.maybe_merge().await;
1287        writer.wait_for_merging_thread().await;
1288
1289        // After commit + auto-merge, segment count should be reduced
1290        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1291        let segment_count = index.segment_readers().await.unwrap().len();
1292        eprintln!(
1293            "Segments: {} before merge, {} after auto-merge",
1294            pre_merge, segment_count
1295        );
1296        assert!(
1297            segment_count < pre_merge,
1298            "Expected auto-merge to reduce segments from {}, got {}",
1299            pre_merge,
1300            segment_count
1301        );
1302    }
1303
1304    /// Regression test: commit with dense vector fields + aggressive merge policy.
1305    /// Exercises the race where background merge deletes segment files while
1306    /// maybe_build_vector_index → collect_vectors_for_training tries to open them.
1307    /// Before the fix, this would fail with "IO error: No such file or directory".
1308    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
1309    async fn test_commit_with_vectors_and_background_merge() {
1310        use crate::directories::MmapDirectory;
1311        use crate::dsl::DenseVectorConfig;
1312
1313        let tmp_dir = tempfile::tempdir().unwrap();
1314        let dir = MmapDirectory::new(tmp_dir.path());
1315
1316        let mut schema_builder = SchemaBuilder::default();
1317        let title = schema_builder.add_text_field("title", true, true);
1318        // RaBitQ with very low build_threshold so vector index building triggers during commit
1319        let vec_config = DenseVectorConfig::new(8).with_build_threshold(10);
1320        let embedding =
1321            schema_builder.add_dense_vector_field_with_config("embedding", true, true, vec_config);
1322        let schema = schema_builder.build();
1323
1324        // Aggressive merge: triggers background merges at 3 segments per tier
1325        let config = IndexConfig {
1326            max_indexing_memory_bytes: 4096,
1327            num_indexing_threads: 4,
1328            merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1329            ..Default::default()
1330        };
1331
1332        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1333            .await
1334            .unwrap();
1335
1336        // Create 12 segments with vectors — enough to trigger both
1337        // background merges (aggressive policy) and vector index building (threshold=10)
1338        for batch in 0..12 {
1339            for i in 0..5 {
1340                let mut doc = Document::new();
1341                doc.add_text(title, format!("doc_{}_batch_{}", i, batch));
1342                // 8-dim random vector
1343                let vec: Vec<f32> = (0..8).map(|j| (i * 8 + j + batch) as f32 * 0.1).collect();
1344                doc.add_dense_vector(embedding, vec);
1345                writer.add_document(doc).unwrap();
1346            }
1347            writer.commit().await.unwrap();
1348        }
1349        writer.wait_for_merging_thread().await;
1350
1351        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1352        let num_docs = index.num_docs().await.unwrap();
1353        assert_eq!(num_docs, 60, "Expected 60 docs, got {}", num_docs);
1354    }
1355
1356    /// Stress test: force_merge with many segments (iterative batching).
1357    /// Verifies that merging 50 segments doesn't OOM or exhaust file descriptors.
1358    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1359    async fn test_force_merge_many_segments() {
1360        use crate::directories::MmapDirectory;
1361        let tmp_dir = tempfile::tempdir().unwrap();
1362        let dir = MmapDirectory::new(tmp_dir.path());
1363
1364        let mut schema_builder = SchemaBuilder::default();
1365        let title = schema_builder.add_text_field("title", true, true);
1366        let schema = schema_builder.build();
1367
1368        let config = IndexConfig {
1369            max_indexing_memory_bytes: 512,
1370            ..Default::default()
1371        };
1372
1373        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1374            .await
1375            .unwrap();
1376
1377        // Create 50 tiny segments
1378        for batch in 0..50 {
1379            for i in 0..3 {
1380                let mut doc = Document::new();
1381                doc.add_text(title, format!("term_{} batch_{}", i, batch));
1382                writer.add_document(doc).unwrap();
1383            }
1384            writer.commit().await.unwrap();
1385        }
1386        // Wait for background merges before reading segment count
1387        writer.wait_for_merging_thread().await;
1388
1389        let seg_ids = writer.segment_manager.get_segment_ids().await;
1390        let pre = seg_ids.len();
1391        eprintln!("Segments before force_merge: {}", pre);
1392        assert!(pre >= 2, "Expected multiple segments, got {}", pre);
1393
1394        // Force merge all into one — should iterate in batches, not OOM
1395        writer.force_merge().await.unwrap();
1396
1397        let index2 = Index::open(dir, config).await.unwrap();
1398        let post = index2.segment_readers().await.unwrap().len();
1399        eprintln!("Segments after force_merge: {}", post);
1400        assert_eq!(post, 1);
1401        assert_eq!(index2.num_docs().await.unwrap(), 150);
1402    }
1403
1404    /// Test that background merges produce correct generation metadata.
1405    /// Creates many segments with aggressive policy, commits, waits for merges,
1406    /// and verifies that merged segments have generation >= 1 with correct ancestors.
1407    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1408    async fn test_background_merge_generation() {
1409        use crate::directories::MmapDirectory;
1410        let tmp_dir = tempfile::tempdir().unwrap();
1411        let dir = MmapDirectory::new(tmp_dir.path());
1412
1413        let mut schema_builder = SchemaBuilder::default();
1414        let title = schema_builder.add_text_field("title", true, true);
1415        let schema = schema_builder.build();
1416
1417        let config = IndexConfig {
1418            max_indexing_memory_bytes: 4096,
1419            num_indexing_threads: 2,
1420            merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1421            ..Default::default()
1422        };
1423
1424        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1425            .await
1426            .unwrap();
1427
1428        // Create 15 small segments — enough for aggressive policy to trigger merges
1429        for batch in 0..15 {
1430            for i in 0..5 {
1431                let mut doc = Document::new();
1432                doc.add_text(title, format!("doc_{}_batch_{}", i, batch));
1433                writer.add_document(doc).unwrap();
1434            }
1435            writer.commit().await.unwrap();
1436        }
1437        writer.wait_for_merging_thread().await;
1438
1439        // Read metadata and verify generation tracking
1440        let metas = writer
1441            .segment_manager
1442            .read_metadata(|m| m.segment_metas.clone())
1443            .await;
1444
1445        let max_gen = metas.values().map(|m| m.generation).max().unwrap_or(0);
1446        eprintln!(
1447            "Segments after merge: {}, max generation: {}",
1448            metas.len(),
1449            max_gen
1450        );
1451
1452        // Background merges should have produced at least one merged segment (gen >= 1)
1453        assert!(
1454            max_gen >= 1,
1455            "Expected at least one merged segment (gen >= 1), got max_gen={}",
1456            max_gen
1457        );
1458
1459        // Every merged segment (gen > 0) must have non-empty ancestors
1460        for (id, info) in &metas {
1461            if info.generation > 0 {
1462                assert!(
1463                    !info.ancestors.is_empty(),
1464                    "Segment {} has gen={} but no ancestors",
1465                    id,
1466                    info.generation
1467                );
1468            } else {
1469                assert!(
1470                    info.ancestors.is_empty(),
1471                    "Fresh segment {} has gen=0 but has ancestors",
1472                    id
1473                );
1474            }
1475        }
1476    }
1477
1478    /// Test that merging preserves every single document.
1479    /// Indexes 1000+ unique documents across many segments, force-merges,
1480    /// and verifies exact doc count and that every unique term is searchable.
1481    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1482    async fn test_merge_preserves_all_documents() {
1483        use crate::directories::MmapDirectory;
1484        let tmp_dir = tempfile::tempdir().unwrap();
1485        let dir = MmapDirectory::new(tmp_dir.path());
1486
1487        let mut schema_builder = SchemaBuilder::default();
1488        let title = schema_builder.add_text_field("title", true, true);
1489        let schema = schema_builder.build();
1490
1491        let config = IndexConfig {
1492            max_indexing_memory_bytes: 4096,
1493            ..Default::default()
1494        };
1495
1496        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1497            .await
1498            .unwrap();
1499
1500        let total_docs = 1200;
1501        let docs_per_batch = 60;
1502        let batches = total_docs / docs_per_batch;
1503
1504        // Each doc has a unique term "uid_N" for verification
1505        for batch in 0..batches {
1506            for i in 0..docs_per_batch {
1507                let doc_num = batch * docs_per_batch + i;
1508                let mut doc = Document::new();
1509                doc.add_text(
1510                    title,
1511                    format!("uid_{} common_term batch_{}", doc_num, batch),
1512                );
1513                writer.add_document(doc).unwrap();
1514            }
1515            writer.commit().await.unwrap();
1516        }
1517
1518        let pre_segments = writer.segment_manager.get_segment_ids().await.len();
1519        assert!(
1520            pre_segments >= 2,
1521            "Need multiple segments, got {}",
1522            pre_segments
1523        );
1524
1525        // Force merge to single segment
1526        writer.force_merge().await.unwrap();
1527
1528        let index = Index::open(dir, config).await.unwrap();
1529        assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1530        assert_eq!(
1531            index.num_docs().await.unwrap(),
1532            total_docs as u32,
1533            "Doc count mismatch after force_merge"
1534        );
1535
1536        // Verify every unique document is searchable
1537        let results = index.query("common_term", total_docs + 100).await.unwrap();
1538        assert_eq!(
1539            results.hits.len(),
1540            total_docs,
1541            "common_term should match all docs"
1542        );
1543
1544        // Spot-check unique IDs across the range
1545        for check in [0, 1, total_docs / 2, total_docs - 1] {
1546            let q = format!("uid_{}", check);
1547            let results = index.query(&q, 10).await.unwrap();
1548            assert_eq!(results.hits.len(), 1, "'{}' should match exactly 1 doc", q);
1549        }
1550    }
1551
1552    /// Multi-round commit+merge: verify doc count grows correctly
1553    /// and no documents are lost across multiple merge cycles.
1554    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1555    async fn test_multi_round_merge_doc_integrity() {
1556        use crate::directories::MmapDirectory;
1557        let tmp_dir = tempfile::tempdir().unwrap();
1558        let dir = MmapDirectory::new(tmp_dir.path());
1559
1560        let mut schema_builder = SchemaBuilder::default();
1561        let title = schema_builder.add_text_field("title", true, true);
1562        let schema = schema_builder.build();
1563
1564        let config = IndexConfig {
1565            max_indexing_memory_bytes: 4096,
1566            num_indexing_threads: 2,
1567            merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1568            ..Default::default()
1569        };
1570
1571        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1572            .await
1573            .unwrap();
1574
1575        let mut expected_total = 0u64;
1576
1577        // 4 rounds of: add docs → commit → wait for merges → verify count
1578        for round in 0..4 {
1579            let docs_this_round = 50 + round * 25; // 50, 75, 100, 125
1580            for batch in 0..5 {
1581                for i in 0..docs_this_round / 5 {
1582                    let mut doc = Document::new();
1583                    doc.add_text(
1584                        title,
1585                        format!("round_{}_batch_{}_doc_{} searchable", round, batch, i),
1586                    );
1587                    writer.add_document(doc).unwrap();
1588                }
1589                writer.commit().await.unwrap();
1590            }
1591            writer.wait_for_merging_thread().await;
1592
1593            expected_total += docs_this_round as u64;
1594
1595            let actual = writer
1596                .segment_manager
1597                .read_metadata(|m| {
1598                    m.segment_metas
1599                        .values()
1600                        .map(|s| s.num_docs as u64)
1601                        .sum::<u64>()
1602                })
1603                .await;
1604
1605            assert_eq!(
1606                actual, expected_total,
1607                "Round {}: expected {} docs, metadata reports {}",
1608                round, expected_total, actual
1609            );
1610        }
1611
1612        // Final verify: open fresh and query
1613        let index = Index::open(dir, config).await.unwrap();
1614        assert_eq!(index.num_docs().await.unwrap(), expected_total as u32);
1615
1616        let results = index
1617            .query("searchable", expected_total as usize + 100)
1618            .await
1619            .unwrap();
1620        assert_eq!(
1621            results.hits.len(),
1622            expected_total as usize,
1623            "All docs should match 'searchable'"
1624        );
1625
1626        // Check generation grew across rounds
1627        let metas = index
1628            .segment_manager()
1629            .read_metadata(|m| m.segment_metas.clone())
1630            .await;
1631        let max_gen = metas.values().map(|m| m.generation).max().unwrap_or(0);
1632        eprintln!(
1633            "Final: {} segments, {} docs, max generation={}",
1634            metas.len(),
1635            expected_total,
1636            max_gen
1637        );
1638        assert!(
1639            max_gen >= 1,
1640            "Multiple merge rounds should produce gen >= 1"
1641        );
1642    }
1643
1644    /// Sustained indexing: verify segment count stays O(logN) bounded.
1645    ///
1646    /// Indexes many small batches with aggressive merge policy and checks that
1647    /// the segment count never grows unbounded. With tiered merging the count
1648    /// should stay roughly O(segments_per_tier * num_tiers) ≈ O(log(N)).
1649    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1650    async fn test_segment_count_bounded_during_sustained_indexing() {
1651        use crate::directories::MmapDirectory;
1652        let tmp_dir = tempfile::tempdir().unwrap();
1653        let dir = MmapDirectory::new(tmp_dir.path());
1654
1655        let mut schema_builder = SchemaBuilder::default();
1656        let title = schema_builder.add_text_field("title", true, false);
1657        let schema = schema_builder.build();
1658
1659        let policy = crate::merge::TieredMergePolicy {
1660            segments_per_tier: 3,
1661            max_merge_at_once: 5,
1662            tier_factor: 10.0,
1663            tier_floor: 50,
1664            max_merged_docs: 1_000_000,
1665        };
1666
1667        let config = IndexConfig {
1668            max_indexing_memory_bytes: 4096, // tiny budget → frequent flushes
1669            num_indexing_threads: 1,
1670            merge_policy: Box::new(policy),
1671            max_concurrent_merges: 4,
1672            ..Default::default()
1673        };
1674
1675        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1676            .await
1677            .unwrap();
1678
1679        let num_commits = 40;
1680        let docs_per_commit = 30;
1681        let total_docs = num_commits * docs_per_commit;
1682        let mut max_segments_seen = 0usize;
1683
1684        for commit_idx in 0..num_commits {
1685            for i in 0..docs_per_commit {
1686                let mut doc = Document::new();
1687                doc.add_text(
1688                    title,
1689                    format!("doc_{} text", commit_idx * docs_per_commit + i),
1690                );
1691                writer.add_document(doc).unwrap();
1692            }
1693            writer.commit().await.unwrap();
1694
1695            // Give background merges a moment to run
1696            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1697
1698            let seg_count = writer.segment_manager.get_segment_ids().await.len();
1699            max_segments_seen = max_segments_seen.max(seg_count);
1700        }
1701
1702        // Wait for all merges to finish
1703        writer.wait_for_all_merges().await;
1704
1705        let final_segments = writer.segment_manager.get_segment_ids().await.len();
1706        let final_docs: u64 = writer
1707            .segment_manager
1708            .read_metadata(|m| {
1709                m.segment_metas
1710                    .values()
1711                    .map(|s| s.num_docs as u64)
1712                    .sum::<u64>()
1713            })
1714            .await;
1715
1716        eprintln!(
1717            "Sustained indexing: {} commits, {} total docs, final segments={}, max segments seen={}",
1718            num_commits, total_docs, final_segments, max_segments_seen
1719        );
1720
1721        // With 1200 docs and segments_per_tier=3, tier_floor=50:
1722        // tier 0: ≤50 docs, tier 1: 50-500, tier 2: 500-5000
1723        // We should have at most ~3 segments per tier * ~3 tiers ≈ 9-12 segments at peak.
1724        // The key invariant: segment count must NOT grow linearly with commits.
1725        // 40 commits should NOT produce 40 segments.
1726        let max_allowed = num_commits / 2; // generous: at most half the commits as segments
1727        assert!(
1728            max_segments_seen <= max_allowed,
1729            "Segment count grew too fast: max seen {} > allowed {} (out of {} commits). \
1730             Merging is not keeping up.",
1731            max_segments_seen,
1732            max_allowed,
1733            num_commits
1734        );
1735
1736        // After all merges complete, should be well under the limit
1737        assert!(
1738            final_segments <= 10,
1739            "After all merges, expected ≤10 segments, got {}",
1740            final_segments
1741        );
1742
1743        // No data loss
1744        assert_eq!(
1745            final_docs, total_docs as u64,
1746            "Expected {} docs, metadata reports {}",
1747            total_docs, final_docs
1748        );
1749    }
1750
1751    // ========================================================================
1752    // Needle-in-haystack comprehensive tests
1753    // ========================================================================
1754
1755    /// Full-text needle-in-haystack: one unique term among many documents.
1756    /// Verifies exact retrieval, scoring, and document content after commit + reopen.
1757    #[tokio::test]
1758    async fn test_needle_fulltext_single_segment() {
1759        let mut sb = SchemaBuilder::default();
1760        let title = sb.add_text_field("title", true, true);
1761        let body = sb.add_text_field("body", true, true);
1762        let schema = sb.build();
1763
1764        let dir = RamDirectory::new();
1765        let config = IndexConfig::default();
1766        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1767            .await
1768            .unwrap();
1769
1770        // 100 hay documents
1771        for i in 0..100 {
1772            let mut doc = Document::new();
1773            doc.add_text(title, format!("Hay document number {}", i));
1774            doc.add_text(
1775                body,
1776                "common words repeated across all hay documents filler text",
1777            );
1778            writer.add_document(doc).unwrap();
1779        }
1780
1781        // 1 needle document (doc 100)
1782        let mut needle = Document::new();
1783        needle.add_text(title, "The unique needle xylophone");
1784        needle.add_text(
1785            body,
1786            "This document contains the extraordinary term xylophone",
1787        );
1788        // Insert needle among hay by re-adding remaining hay after it
1789        // Actually, we already added 100, so needle is doc 100
1790        writer.add_document(needle).unwrap();
1791
1792        // 50 more hay documents after needle
1793        for i in 100..150 {
1794            let mut doc = Document::new();
1795            doc.add_text(title, format!("More hay document {}", i));
1796            doc.add_text(body, "common words filler text again and again");
1797            writer.add_document(doc).unwrap();
1798        }
1799
1800        writer.commit().await.unwrap();
1801
1802        let index = Index::open(dir, config).await.unwrap();
1803        assert_eq!(index.num_docs().await.unwrap(), 151);
1804
1805        // Search for the needle term
1806        let results = index.query("xylophone", 10).await.unwrap();
1807        assert_eq!(results.hits.len(), 1, "Should find exactly the needle");
1808        assert!(results.hits[0].score > 0.0, "Score should be positive");
1809
1810        // Verify document content
1811        let doc = index
1812            .get_document(&results.hits[0].address)
1813            .await
1814            .unwrap()
1815            .unwrap();
1816        let title_val = doc.get_first(title).unwrap().as_text().unwrap();
1817        assert!(
1818            title_val.contains("xylophone"),
1819            "Retrieved doc should be the needle"
1820        );
1821
1822        // Search for common term — should return many
1823        let results = index.query("common", 200).await.unwrap();
1824        assert!(
1825            results.hits.len() >= 100,
1826            "Common term should match many docs"
1827        );
1828
1829        // Negative test — term that doesn't exist
1830        let results = index.query("nonexistentterm99999", 10).await.unwrap();
1831        assert_eq!(
1832            results.hits.len(),
1833            0,
1834            "Non-existent term should match nothing"
1835        );
1836    }
1837
1838    /// Full-text needle across multiple segments: ensures cross-segment search works.
1839    #[tokio::test]
1840    async fn test_needle_fulltext_multi_segment() {
1841        use crate::query::TermQuery;
1842
1843        let mut sb = SchemaBuilder::default();
1844        let content = sb.add_text_field("content", true, true);
1845        let schema = sb.build();
1846
1847        let dir = RamDirectory::new();
1848        let config = IndexConfig::default();
1849        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1850            .await
1851            .unwrap();
1852
1853        // Segment 1: 50 hay docs
1854        for i in 0..50 {
1855            let mut doc = Document::new();
1856            doc.add_text(content, format!("segment one hay document {}", i));
1857            writer.add_document(doc).unwrap();
1858        }
1859        writer.commit().await.unwrap();
1860
1861        // Segment 2: needle + 49 hay docs
1862        let mut needle = Document::new();
1863        needle.add_text(content, "the magnificent quetzalcoatl serpent deity");
1864        writer.add_document(needle).unwrap();
1865        for i in 0..49 {
1866            let mut doc = Document::new();
1867            doc.add_text(content, format!("segment two hay document {}", i));
1868            writer.add_document(doc).unwrap();
1869        }
1870        writer.commit().await.unwrap();
1871
1872        // Segment 3: 50 more hay docs
1873        for i in 0..50 {
1874            let mut doc = Document::new();
1875            doc.add_text(content, format!("segment three hay document {}", i));
1876            writer.add_document(doc).unwrap();
1877        }
1878        writer.commit().await.unwrap();
1879
1880        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1881        assert_eq!(index.num_docs().await.unwrap(), 150);
1882        let num_segments = index.segment_readers().await.unwrap().len();
1883        assert!(
1884            num_segments >= 2,
1885            "Should have multiple segments, got {}",
1886            num_segments
1887        );
1888
1889        // Find needle across segments
1890        let results = index.query("quetzalcoatl", 10).await.unwrap();
1891        assert_eq!(
1892            results.hits.len(),
1893            1,
1894            "Should find exactly 1 needle across segments"
1895        );
1896
1897        // Verify using TermQuery directly
1898        let reader = index.reader().await.unwrap();
1899        let searcher = reader.searcher().await.unwrap();
1900        let tq = TermQuery::text(content, "quetzalcoatl");
1901        let results = searcher.search(&tq, 10).await.unwrap();
1902        assert_eq!(results.len(), 1, "TermQuery should also find the needle");
1903
1904        // Verify content
1905        let doc = searcher.doc(results[0].doc_id).await.unwrap().unwrap();
1906        let text = doc.get_first(content).unwrap().as_text().unwrap();
1907        assert!(
1908            text.contains("quetzalcoatl"),
1909            "Should retrieve needle content"
1910        );
1911
1912        // Cross-segment term that exists in all segments
1913        let results = index.query("document", 200).await.unwrap();
1914        assert!(
1915            results.hits.len() >= 149,
1916            "Should find hay docs across all segments"
1917        );
1918    }
1919
1920    /// Sparse vector needle-in-haystack: one document with unique dimensions.
1921    #[tokio::test]
1922    async fn test_needle_sparse_vector() {
1923        use crate::query::SparseVectorQuery;
1924
1925        let mut sb = SchemaBuilder::default();
1926        let title = sb.add_text_field("title", true, true);
1927        let sparse = sb.add_sparse_vector_field("sparse", true, true);
1928        let schema = sb.build();
1929
1930        let dir = RamDirectory::new();
1931        let config = IndexConfig::default();
1932        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1933            .await
1934            .unwrap();
1935
1936        // 100 hay documents with sparse vectors on dimensions 0-9
1937        for i in 0..100 {
1938            let mut doc = Document::new();
1939            doc.add_text(title, format!("Hay sparse doc {}", i));
1940            // All hay docs share dimensions 0-9 with varying weights
1941            let entries: Vec<(u32, f32)> = (0..10)
1942                .map(|d| (d, 0.1 + (i as f32 * 0.001) + (d as f32 * 0.01)))
1943                .collect();
1944            doc.add_sparse_vector(sparse, entries);
1945            writer.add_document(doc).unwrap();
1946        }
1947
1948        // Needle: unique dimensions 1000, 1001, 1002 (no other doc has these)
1949        let mut needle = Document::new();
1950        needle.add_text(title, "Needle sparse document");
1951        needle.add_sparse_vector(
1952            sparse,
1953            vec![(1000, 0.9), (1001, 0.8), (1002, 0.7), (5, 0.3)],
1954        );
1955        writer.add_document(needle).unwrap();
1956
1957        // 50 more hay docs
1958        for i in 100..150 {
1959            let mut doc = Document::new();
1960            doc.add_text(title, format!("More hay sparse doc {}", i));
1961            let entries: Vec<(u32, f32)> = (0..10).map(|d| (d, 0.2 + (d as f32 * 0.02))).collect();
1962            doc.add_sparse_vector(sparse, entries);
1963            writer.add_document(doc).unwrap();
1964        }
1965
1966        writer.commit().await.unwrap();
1967
1968        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1969        assert_eq!(index.num_docs().await.unwrap(), 151);
1970
1971        // Query with needle's unique dimensions
1972        let reader = index.reader().await.unwrap();
1973        let searcher = reader.searcher().await.unwrap();
1974        let query = SparseVectorQuery::new(sparse, vec![(1000, 1.0), (1001, 1.0), (1002, 1.0)]);
1975        let results = searcher.search(&query, 10).await.unwrap();
1976        assert_eq!(results.len(), 1, "Only needle has dims 1000-1002");
1977        assert!(results[0].score > 0.0, "Needle score should be positive");
1978
1979        // Verify it's the right document
1980        let doc = searcher.doc(results[0].doc_id).await.unwrap().unwrap();
1981        let title_val = doc.get_first(title).unwrap().as_text().unwrap();
1982        assert_eq!(title_val, "Needle sparse document");
1983
1984        // Query with shared dimension — should match many
1985        let query_shared = SparseVectorQuery::new(sparse, vec![(5, 1.0)]);
1986        let results = searcher.search(&query_shared, 200).await.unwrap();
1987        assert!(
1988            results.len() >= 100,
1989            "Shared dim 5 should match many docs, got {}",
1990            results.len()
1991        );
1992
1993        // Query with non-existent dimension — should match nothing
1994        let query_missing = SparseVectorQuery::new(sparse, vec![(99999, 1.0)]);
1995        let results = searcher.search(&query_missing, 10).await.unwrap();
1996        assert_eq!(
1997            results.len(),
1998            0,
1999            "Non-existent dimension should match nothing"
2000        );
2001    }
2002
2003    /// Sparse vector needle across multiple segments with merge.
2004    #[tokio::test]
2005    async fn test_needle_sparse_vector_multi_segment_merge() {
2006        use crate::query::SparseVectorQuery;
2007
2008        let mut sb = SchemaBuilder::default();
2009        let title = sb.add_text_field("title", true, true);
2010        let sparse = sb.add_sparse_vector_field("sparse", true, true);
2011        let schema = sb.build();
2012
2013        let dir = RamDirectory::new();
2014        let config = IndexConfig::default();
2015        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
2016            .await
2017            .unwrap();
2018
2019        // Segment 1: hay
2020        for i in 0..30 {
2021            let mut doc = Document::new();
2022            doc.add_text(title, format!("seg1 hay {}", i));
2023            doc.add_sparse_vector(sparse, vec![(0, 0.5), (1, 0.3)]);
2024            writer.add_document(doc).unwrap();
2025        }
2026        writer.commit().await.unwrap();
2027
2028        // Segment 2: needle + hay
2029        let mut needle = Document::new();
2030        needle.add_text(title, "seg2 needle");
2031        needle.add_sparse_vector(sparse, vec![(500, 0.95), (501, 0.85)]);
2032        writer.add_document(needle).unwrap();
2033        for i in 0..29 {
2034            let mut doc = Document::new();
2035            doc.add_text(title, format!("seg2 hay {}", i));
2036            doc.add_sparse_vector(sparse, vec![(0, 0.4), (2, 0.6)]);
2037            writer.add_document(doc).unwrap();
2038        }
2039        writer.commit().await.unwrap();
2040
2041        // Verify pre-merge
2042        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
2043        assert_eq!(index.num_docs().await.unwrap(), 60);
2044
2045        let reader = index.reader().await.unwrap();
2046        let searcher = reader.searcher().await.unwrap();
2047        let query = SparseVectorQuery::new(sparse, vec![(500, 1.0), (501, 1.0)]);
2048        let results = searcher.search(&query, 10).await.unwrap();
2049        assert_eq!(results.len(), 1, "Pre-merge: needle should be found");
2050        let doc = searcher.doc(results[0].doc_id).await.unwrap().unwrap();
2051        assert_eq!(
2052            doc.get_first(title).unwrap().as_text().unwrap(),
2053            "seg2 needle"
2054        );
2055
2056        // Force merge
2057        let mut writer = IndexWriter::open(dir.clone(), config.clone())
2058            .await
2059            .unwrap();
2060        writer.force_merge().await.unwrap();
2061
2062        // Verify post-merge
2063        let index = Index::open(dir, config).await.unwrap();
2064        assert_eq!(index.segment_readers().await.unwrap().len(), 1);
2065        assert_eq!(index.num_docs().await.unwrap(), 60);
2066
2067        let reader = index.reader().await.unwrap();
2068        let searcher = reader.searcher().await.unwrap();
2069        let query = SparseVectorQuery::new(sparse, vec![(500, 1.0), (501, 1.0)]);
2070        let results = searcher.search(&query, 10).await.unwrap();
2071        assert_eq!(results.len(), 1, "Post-merge: needle should still be found");
2072        let doc = searcher.doc(results[0].doc_id).await.unwrap().unwrap();
2073        assert_eq!(
2074            doc.get_first(title).unwrap().as_text().unwrap(),
2075            "seg2 needle"
2076        );
2077    }
2078
2079    /// Dense vector needle-in-haystack using brute-force (Flat) search.
2080    #[tokio::test]
2081    async fn test_needle_dense_vector_flat() {
2082        use crate::dsl::{DenseVectorConfig, VectorIndexType};
2083        use crate::query::DenseVectorQuery;
2084
2085        let dim = 16;
2086        let mut sb = SchemaBuilder::default();
2087        let title = sb.add_text_field("title", true, true);
2088        let embedding = sb.add_dense_vector_field_with_config(
2089            "embedding",
2090            true,
2091            true,
2092            DenseVectorConfig {
2093                dim,
2094                index_type: VectorIndexType::Flat,
2095                quantization: crate::dsl::DenseVectorQuantization::F32,
2096                num_clusters: None,
2097                nprobe: 0,
2098                build_threshold: None,
2099                unit_norm: false,
2100            },
2101        );
2102        let schema = sb.build();
2103
2104        let dir = RamDirectory::new();
2105        let config = IndexConfig::default();
2106        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
2107            .await
2108            .unwrap();
2109
2110        // 100 hay docs: vectors near origin (small random-ish values)
2111        for i in 0..100 {
2112            let mut doc = Document::new();
2113            doc.add_text(title, format!("Hay dense doc {}", i));
2114            // Hay vectors: low-magnitude, varying direction
2115            let vec: Vec<f32> = (0..dim)
2116                .map(|d| ((i * 7 + d * 13) % 100) as f32 / 1000.0)
2117                .collect();
2118            doc.add_dense_vector(embedding, vec);
2119            writer.add_document(doc).unwrap();
2120        }
2121
2122        // Needle: vector pointing strongly in one direction [1,1,1,...,1]
2123        let mut needle = Document::new();
2124        needle.add_text(title, "Needle dense document");
2125        let needle_vec: Vec<f32> = vec![1.0; dim];
2126        needle.add_dense_vector(embedding, needle_vec.clone());
2127        writer.add_document(needle).unwrap();
2128
2129        writer.commit().await.unwrap();
2130
2131        let index = Index::open(dir, config).await.unwrap();
2132        assert_eq!(index.num_docs().await.unwrap(), 101);
2133
2134        // Query with the needle vector — it should be the top result
2135        let reader = index.reader().await.unwrap();
2136        let searcher = reader.searcher().await.unwrap();
2137        let query = DenseVectorQuery::new(embedding, needle_vec);
2138        let results = searcher.search(&query, 5).await.unwrap();
2139        assert!(!results.is_empty(), "Should find at least 1 result");
2140
2141        // The needle (exact match) should be the top result with highest score
2142        let top_doc = searcher.doc(results[0].doc_id).await.unwrap().unwrap();
2143        let top_title = top_doc.get_first(title).unwrap().as_text().unwrap();
2144        assert_eq!(
2145            top_title, "Needle dense document",
2146            "Top result should be the needle (exact vector match)"
2147        );
2148        assert!(
2149            results[0].score > 0.9,
2150            "Exact match should have very high cosine similarity, got {}",
2151            results[0].score
2152        );
2153    }
2154
2155    /// Combined: full-text + sparse + dense in the same index.
2156    /// Verifies all three retrieval paths work independently on the same dataset.
2157    #[tokio::test]
2158    async fn test_needle_combined_all_modalities() {
2159        use crate::dsl::{DenseVectorConfig, VectorIndexType};
2160        use crate::query::{DenseVectorQuery, SparseVectorQuery, TermQuery};
2161
2162        let dim = 8;
2163        let mut sb = SchemaBuilder::default();
2164        let title = sb.add_text_field("title", true, true);
2165        let body = sb.add_text_field("body", true, true);
2166        let sparse = sb.add_sparse_vector_field("sparse", true, true);
2167        let embedding = sb.add_dense_vector_field_with_config(
2168            "embedding",
2169            true,
2170            true,
2171            DenseVectorConfig {
2172                dim,
2173                index_type: VectorIndexType::Flat,
2174                quantization: crate::dsl::DenseVectorQuantization::F32,
2175                num_clusters: None,
2176                nprobe: 0,
2177                build_threshold: None,
2178                unit_norm: false,
2179            },
2180        );
2181        let schema = sb.build();
2182
2183        let dir = RamDirectory::new();
2184        let config = IndexConfig::default();
2185        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
2186            .await
2187            .unwrap();
2188
2189        // 80 hay docs with all three modalities
2190        for i in 0..80u32 {
2191            let mut doc = Document::new();
2192            doc.add_text(title, format!("Hay doc {}", i));
2193            doc.add_text(body, "general filler text about nothing special");
2194            doc.add_sparse_vector(sparse, vec![(0, 0.3), (1, 0.2), ((i % 10) + 10, 0.5)]);
2195            let vec: Vec<f32> = (0..dim)
2196                .map(|d| ((i as usize * 3 + d * 7) % 50) as f32 / 100.0)
2197                .collect();
2198            doc.add_dense_vector(embedding, vec);
2199            writer.add_document(doc).unwrap();
2200        }
2201
2202        // Needle doc: unique in ALL three modalities
2203        let mut needle = Document::new();
2204        needle.add_text(title, "The extraordinary rhinoceros");
2205        needle.add_text(
2206            body,
2207            "This document about rhinoceros is the only one with this word",
2208        );
2209        needle.add_sparse_vector(sparse, vec![(9999, 0.99), (9998, 0.88)]);
2210        let needle_vec = vec![0.9; dim];
2211        needle.add_dense_vector(embedding, needle_vec.clone());
2212        writer.add_document(needle).unwrap();
2213
2214        writer.commit().await.unwrap();
2215
2216        let index = Index::open(dir, config).await.unwrap();
2217        assert_eq!(index.num_docs().await.unwrap(), 81);
2218
2219        let reader = index.reader().await.unwrap();
2220        let searcher = reader.searcher().await.unwrap();
2221
2222        // --- Full-text needle ---
2223        let tq = TermQuery::text(body, "rhinoceros");
2224        let results = searcher.search(&tq, 10).await.unwrap();
2225        assert_eq!(
2226            results.len(),
2227            1,
2228            "Full-text: should find exactly the needle"
2229        );
2230        let doc = searcher.doc(results[0].doc_id).await.unwrap().unwrap();
2231        assert!(
2232            doc.get_first(title)
2233                .unwrap()
2234                .as_text()
2235                .unwrap()
2236                .contains("rhinoceros")
2237        );
2238
2239        // --- Sparse vector needle ---
2240        let sq = SparseVectorQuery::new(sparse, vec![(9999, 1.0), (9998, 1.0)]);
2241        let results = searcher.search(&sq, 10).await.unwrap();
2242        assert_eq!(results.len(), 1, "Sparse: should find exactly the needle");
2243        let doc = searcher.doc(results[0].doc_id).await.unwrap().unwrap();
2244        assert!(
2245            doc.get_first(title)
2246                .unwrap()
2247                .as_text()
2248                .unwrap()
2249                .contains("rhinoceros")
2250        );
2251
2252        // --- Dense vector needle ---
2253        let dq = DenseVectorQuery::new(embedding, needle_vec);
2254        let results = searcher.search(&dq, 1).await.unwrap();
2255        assert!(!results.is_empty(), "Dense: should find at least 1 result");
2256        let doc = searcher.doc(results[0].doc_id).await.unwrap().unwrap();
2257        assert_eq!(
2258            doc.get_first(title).unwrap().as_text().unwrap(),
2259            "The extraordinary rhinoceros",
2260            "Dense: top-1 should be the needle"
2261        );
2262
2263        // Verify all three found the same document
2264        let ft_doc_id = {
2265            let tq = TermQuery::text(body, "rhinoceros");
2266            let r = searcher.search(&tq, 1).await.unwrap();
2267            r[0].doc_id
2268        };
2269        let sp_doc_id = {
2270            let sq = SparseVectorQuery::new(sparse, vec![(9999, 1.0)]);
2271            let r = searcher.search(&sq, 1).await.unwrap();
2272            r[0].doc_id
2273        };
2274        let dn_doc_id = {
2275            let dq = DenseVectorQuery::new(embedding, vec![0.9; dim]);
2276            let r = searcher.search(&dq, 1).await.unwrap();
2277            r[0].doc_id
2278        };
2279
2280        assert_eq!(
2281            ft_doc_id, sp_doc_id,
2282            "Full-text and sparse should find same doc"
2283        );
2284        assert_eq!(
2285            sp_doc_id, dn_doc_id,
2286            "Sparse and dense should find same doc"
2287        );
2288    }
2289
2290    /// Stress test: many needles scattered across segments, verify ALL are found.
2291    #[tokio::test]
2292    async fn test_many_needles_all_found() {
2293        let mut sb = SchemaBuilder::default();
2294        let content = sb.add_text_field("content", true, true);
2295        let schema = sb.build();
2296
2297        let dir = RamDirectory::new();
2298        let config = IndexConfig::default();
2299        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
2300            .await
2301            .unwrap();
2302
2303        let num_needles = 20usize;
2304        let hay_per_batch = 50usize;
2305        let needle_terms: Vec<String> = (0..num_needles)
2306            .map(|i| format!("uniqueneedle{:04}", i))
2307            .collect();
2308
2309        // Interleave needles with hay across commits
2310        for batch in 0..4 {
2311            // Hay
2312            for i in 0..hay_per_batch {
2313                let mut doc = Document::new();
2314                doc.add_text(
2315                    content,
2316                    format!("hay batch {} item {} common filler", batch, i),
2317                );
2318                writer.add_document(doc).unwrap();
2319            }
2320            // 5 needles per batch
2321            for n in 0..5 {
2322                let needle_idx = batch * 5 + n;
2323                let mut doc = Document::new();
2324                doc.add_text(
2325                    content,
2326                    format!("this is {} among many documents", needle_terms[needle_idx]),
2327                );
2328                writer.add_document(doc).unwrap();
2329            }
2330            writer.commit().await.unwrap();
2331        }
2332
2333        let index = Index::open(dir, config).await.unwrap();
2334        let total = index.num_docs().await.unwrap();
2335        assert_eq!(total, (hay_per_batch * 4 + num_needles) as u32);
2336
2337        // Find EVERY needle
2338        for term in &needle_terms {
2339            let results = index.query(term, 10).await.unwrap();
2340            assert_eq!(
2341                results.hits.len(),
2342                1,
2343                "Should find exactly 1 doc for needle '{}'",
2344                term
2345            );
2346        }
2347
2348        // Verify hay term matches all hay docs
2349        let results = index.query("common", 500).await.unwrap();
2350        assert_eq!(
2351            results.hits.len(),
2352            hay_per_batch * 4,
2353            "Common term should match all {} hay docs",
2354            hay_per_batch * 4
2355        );
2356    }
2357}