Skip to main content

hermes_core/index/
mod.rs

1//! Index - multi-segment async search index
2//!
3//! The `Index` is the central concept that provides:
4//! - `Index::create()` / `Index::open()` - create or open an index
5//! - `index.writer()` - get an IndexWriter for adding documents
6//! - `index.reader()` - get an IndexReader for searching (with reload policy)
7//!
8//! The Index owns the SegmentManager which handles segment lifecycle and tracking.
9
10#[cfg(feature = "native")]
11use crate::dsl::Schema;
12#[cfg(feature = "native")]
13use crate::error::Result;
14#[cfg(feature = "native")]
15use std::sync::Arc;
16
17mod searcher;
18pub use searcher::Searcher;
19
20#[cfg(feature = "native")]
21mod reader;
22#[cfg(feature = "native")]
23mod vector_builder;
24#[cfg(feature = "native")]
25mod writer;
26#[cfg(feature = "native")]
27pub use reader::IndexReader;
28#[cfg(feature = "native")]
29pub use writer::{IndexWriter, PreparedCommit};
30
31mod metadata;
32pub use metadata::{FieldVectorMeta, INDEX_META_FILENAME, IndexMetadata, VectorIndexState};
33
34#[cfg(feature = "native")]
35mod helpers;
36#[cfg(feature = "native")]
37pub use helpers::{
38    IndexingStats, SchemaConfig, SchemaFieldConfig, create_index_at_path, create_index_from_sdl,
39    index_documents_from_reader, index_json_document, parse_schema,
40};
41
42/// Default file name for the slice cache
43pub const SLICE_CACHE_FILENAME: &str = "index.slicecache";
44
45/// Index configuration
46#[derive(Debug, Clone)]
47pub struct IndexConfig {
48    /// Number of threads for CPU-intensive tasks (search parallelism)
49    pub num_threads: usize,
50    /// Number of parallel segment builders (documents distributed round-robin)
51    pub num_indexing_threads: usize,
52    /// Number of threads for parallel block compression within each segment
53    pub num_compression_threads: usize,
54    /// Block cache size for term dictionary per segment
55    pub term_cache_blocks: usize,
56    /// Block cache size for document store per segment
57    pub store_cache_blocks: usize,
58    /// Max memory (bytes) across all builders before auto-commit (global limit)
59    pub max_indexing_memory_bytes: usize,
60    /// Merge policy for background segment merging
61    pub merge_policy: Box<dyn crate::merge::MergePolicy>,
62    /// Index optimization mode (adaptive, size-optimized, performance-optimized)
63    pub optimization: crate::structures::IndexOptimization,
64    /// Reload interval in milliseconds for IndexReader (how often to check for new segments)
65    pub reload_interval_ms: u64,
66    /// Maximum number of concurrent background merges (default: 4)
67    pub max_concurrent_merges: usize,
68}
69
70impl Default for IndexConfig {
71    fn default() -> Self {
72        #[cfg(feature = "native")]
73        let indexing_threads = crate::default_indexing_threads();
74        #[cfg(not(feature = "native"))]
75        let indexing_threads = 1;
76
77        #[cfg(feature = "native")]
78        let compression_threads = crate::default_compression_threads();
79        #[cfg(not(feature = "native"))]
80        let compression_threads = 1;
81
82        Self {
83            num_threads: indexing_threads,
84            num_indexing_threads: 1, // Increase to 2+ for production to avoid stalls during segment build
85            num_compression_threads: compression_threads,
86            term_cache_blocks: 256,
87            store_cache_blocks: 32,
88            max_indexing_memory_bytes: 256 * 1024 * 1024, // 256 MB default
89            merge_policy: Box::new(crate::merge::TieredMergePolicy::default()),
90            optimization: crate::structures::IndexOptimization::default(),
91            reload_interval_ms: 1000, // 1 second default
92            max_concurrent_merges: 4,
93        }
94    }
95}
96
97/// Multi-segment async Index
98///
99/// The central concept for search. Owns segment lifecycle and provides:
100/// - `Index::create()` / `Index::open()` - create or open an index
101/// - `index.writer()` - get an IndexWriter for adding documents
102/// - `index.reader()` - get an IndexReader for searching with reload policy
103///
104/// All segment management is delegated to SegmentManager.
105#[cfg(feature = "native")]
106pub struct Index<D: crate::directories::DirectoryWriter + 'static> {
107    directory: Arc<D>,
108    schema: Arc<Schema>,
109    config: IndexConfig,
110    /// Segment manager - owns segments, tracker, metadata, and trained structures
111    segment_manager: Arc<crate::merge::SegmentManager<D>>,
112    /// Cached reader (created lazily, reused across calls)
113    cached_reader: tokio::sync::OnceCell<IndexReader<D>>,
114}
115
116#[cfg(feature = "native")]
117impl<D: crate::directories::DirectoryWriter + 'static> Index<D> {
118    /// Create a new index in the directory
119    pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
120        let directory = Arc::new(directory);
121        let schema = Arc::new(schema);
122        let metadata = IndexMetadata::new((*schema).clone());
123
124        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
125            Arc::clone(&directory),
126            Arc::clone(&schema),
127            metadata,
128            config.merge_policy.clone_box(),
129            config.term_cache_blocks,
130            config.max_concurrent_merges,
131        ));
132
133        // Save initial metadata
134        segment_manager.update_metadata(|_| {}).await?;
135
136        Ok(Self {
137            directory,
138            schema,
139            config,
140            segment_manager,
141            cached_reader: tokio::sync::OnceCell::new(),
142        })
143    }
144
145    /// Open an existing index from a directory
146    pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
147        let directory = Arc::new(directory);
148
149        // Load metadata (includes schema)
150        let metadata = IndexMetadata::load(directory.as_ref()).await?;
151        let schema = Arc::new(metadata.schema.clone());
152
153        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
154            Arc::clone(&directory),
155            Arc::clone(&schema),
156            metadata,
157            config.merge_policy.clone_box(),
158            config.term_cache_blocks,
159            config.max_concurrent_merges,
160        ));
161
162        // Load trained structures into SegmentManager's ArcSwap
163        segment_manager.load_and_publish_trained().await;
164
165        Ok(Self {
166            directory,
167            schema,
168            config,
169            segment_manager,
170            cached_reader: tokio::sync::OnceCell::new(),
171        })
172    }
173
174    /// Get the schema
175    pub fn schema(&self) -> &Schema {
176        &self.schema
177    }
178
179    /// Get the schema as an Arc reference (avoids clone when Arc is needed)
180    pub fn schema_arc(&self) -> &Arc<Schema> {
181        &self.schema
182    }
183
184    /// Get a reference to the underlying directory
185    pub fn directory(&self) -> &D {
186        &self.directory
187    }
188
189    /// Get the segment manager
190    pub fn segment_manager(&self) -> &Arc<crate::merge::SegmentManager<D>> {
191        &self.segment_manager
192    }
193
194    /// Get an IndexReader for searching (with reload policy)
195    ///
196    /// The reader is cached and reused across calls. The reader's internal
197    /// searcher will reload segments based on its reload interval (configurable via IndexConfig).
198    pub async fn reader(&self) -> Result<&IndexReader<D>> {
199        self.cached_reader
200            .get_or_try_init(|| async {
201                IndexReader::from_segment_manager(
202                    Arc::clone(&self.schema),
203                    Arc::clone(&self.segment_manager),
204                    self.config.term_cache_blocks,
205                    self.config.reload_interval_ms,
206                )
207                .await
208            })
209            .await
210    }
211
212    /// Get the config
213    pub fn config(&self) -> &IndexConfig {
214        &self.config
215    }
216
217    /// Get segment readers for query execution (convenience method)
218    pub async fn segment_readers(&self) -> Result<Vec<Arc<crate::segment::SegmentReader>>> {
219        let reader = self.reader().await?;
220        let searcher = reader.searcher().await?;
221        Ok(searcher.segment_readers().to_vec())
222    }
223
224    /// Total number of documents across all segments
225    pub async fn num_docs(&self) -> Result<u32> {
226        let reader = self.reader().await?;
227        let searcher = reader.searcher().await?;
228        Ok(searcher.num_docs())
229    }
230
231    /// Get default fields for search
232    pub fn default_fields(&self) -> Vec<crate::Field> {
233        if !self.schema.default_fields().is_empty() {
234            self.schema.default_fields().to_vec()
235        } else {
236            self.schema
237                .fields()
238                .filter(|(_, entry)| {
239                    entry.indexed && entry.field_type == crate::dsl::FieldType::Text
240                })
241                .map(|(field, _)| field)
242                .collect()
243        }
244    }
245
246    /// Get tokenizer registry
247    pub fn tokenizers(&self) -> Arc<crate::tokenizer::TokenizerRegistry> {
248        Arc::new(crate::tokenizer::TokenizerRegistry::default())
249    }
250
251    /// Create a query parser for this index
252    pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
253        let default_fields = self.default_fields();
254        let tokenizers = self.tokenizers();
255
256        let query_routers = self.schema.query_routers();
257        if !query_routers.is_empty()
258            && let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers)
259        {
260            return crate::dsl::QueryLanguageParser::with_router(
261                Arc::clone(&self.schema),
262                default_fields,
263                tokenizers,
264                router,
265            );
266        }
267
268        crate::dsl::QueryLanguageParser::new(Arc::clone(&self.schema), default_fields, tokenizers)
269    }
270
271    /// Parse and search using a query string
272    pub async fn query(
273        &self,
274        query_str: &str,
275        limit: usize,
276    ) -> Result<crate::query::SearchResponse> {
277        self.query_offset(query_str, limit, 0).await
278    }
279
280    /// Query with offset for pagination
281    pub async fn query_offset(
282        &self,
283        query_str: &str,
284        limit: usize,
285        offset: usize,
286    ) -> Result<crate::query::SearchResponse> {
287        let parser = self.query_parser();
288        let query = parser
289            .parse(query_str)
290            .map_err(crate::error::Error::Query)?;
291        self.search_offset(query.as_ref(), limit, offset).await
292    }
293
294    /// Search and return results
295    pub async fn search(
296        &self,
297        query: &dyn crate::query::Query,
298        limit: usize,
299    ) -> Result<crate::query::SearchResponse> {
300        self.search_offset(query, limit, 0).await
301    }
302
303    /// Search with offset for pagination
304    pub async fn search_offset(
305        &self,
306        query: &dyn crate::query::Query,
307        limit: usize,
308        offset: usize,
309    ) -> Result<crate::query::SearchResponse> {
310        let reader = self.reader().await?;
311        let searcher = reader.searcher().await?;
312        let segments = searcher.segment_readers();
313
314        let fetch_limit = offset + limit;
315
316        let futures: Vec<_> = segments
317            .iter()
318            .map(|segment| {
319                let sid = segment.meta().id;
320                async move {
321                    let results =
322                        crate::query::search_segment(segment.as_ref(), query, fetch_limit).await?;
323                    Ok::<_, crate::error::Error>(
324                        results
325                            .into_iter()
326                            .map(move |r| (sid, r))
327                            .collect::<Vec<_>>(),
328                    )
329                }
330            })
331            .collect();
332
333        let batches = futures::future::try_join_all(futures).await?;
334        let mut all_results: Vec<(u128, crate::query::SearchResult)> =
335            Vec::with_capacity(batches.iter().map(|b| b.len()).sum());
336        for batch in batches {
337            all_results.extend(batch);
338        }
339
340        all_results.sort_by(|a, b| {
341            b.1.score
342                .partial_cmp(&a.1.score)
343                .unwrap_or(std::cmp::Ordering::Equal)
344        });
345
346        let total_hits = all_results.len() as u32;
347
348        let hits: Vec<crate::query::SearchHit> = all_results
349            .into_iter()
350            .skip(offset)
351            .take(limit)
352            .map(|(segment_id, result)| crate::query::SearchHit {
353                address: crate::query::DocAddress::new(segment_id, result.doc_id),
354                score: result.score,
355                matched_fields: result.extract_ordinals(),
356            })
357            .collect();
358
359        Ok(crate::query::SearchResponse { hits, total_hits })
360    }
361
362    /// Get a document by its unique address
363    pub async fn get_document(
364        &self,
365        address: &crate::query::DocAddress,
366    ) -> Result<Option<crate::dsl::Document>> {
367        let reader = self.reader().await?;
368        let searcher = reader.searcher().await?;
369        searcher.get_document(address).await
370    }
371
372    /// Get posting lists for a term across all segments
373    pub async fn get_postings(
374        &self,
375        field: crate::Field,
376        term: &[u8],
377    ) -> Result<
378        Vec<(
379            Arc<crate::segment::SegmentReader>,
380            crate::structures::BlockPostingList,
381        )>,
382    > {
383        let segments = self.segment_readers().await?;
384        let mut results = Vec::new();
385
386        for segment in segments {
387            if let Some(postings) = segment.get_postings(field, term).await? {
388                results.push((segment, postings));
389            }
390        }
391
392        Ok(results)
393    }
394}
395
396/// Native-only methods for Index
397#[cfg(feature = "native")]
398impl<D: crate::directories::DirectoryWriter + 'static> Index<D> {
399    /// Get an IndexWriter for adding documents
400    pub fn writer(&self) -> writer::IndexWriter<D> {
401        writer::IndexWriter::from_index(self)
402    }
403}
404
405#[cfg(test)]
406mod tests {
407    use super::*;
408    use crate::directories::RamDirectory;
409    use crate::dsl::{Document, SchemaBuilder};
410
411    #[tokio::test]
412    async fn test_index_create_and_search() {
413        let mut schema_builder = SchemaBuilder::default();
414        let title = schema_builder.add_text_field("title", true, true);
415        let body = schema_builder.add_text_field("body", true, true);
416        let schema = schema_builder.build();
417
418        let dir = RamDirectory::new();
419        let config = IndexConfig::default();
420
421        // Create index and add documents
422        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
423            .await
424            .unwrap();
425
426        let mut doc1 = Document::new();
427        doc1.add_text(title, "Hello World");
428        doc1.add_text(body, "This is the first document");
429        writer.add_document(doc1).unwrap();
430
431        let mut doc2 = Document::new();
432        doc2.add_text(title, "Goodbye World");
433        doc2.add_text(body, "This is the second document");
434        writer.add_document(doc2).unwrap();
435
436        writer.commit().await.unwrap();
437
438        // Open for reading
439        let index = Index::open(dir, config).await.unwrap();
440        assert_eq!(index.num_docs().await.unwrap(), 2);
441
442        // Check postings
443        let postings = index.get_postings(title, b"world").await.unwrap();
444        assert_eq!(postings.len(), 1); // One segment
445        assert_eq!(postings[0].1.doc_count(), 2); // Two docs with "world"
446
447        // Retrieve document via searcher snapshot
448        let reader = index.reader().await.unwrap();
449        let searcher = reader.searcher().await.unwrap();
450        let seg_id = searcher.segment_readers()[0].meta().id;
451        let doc = searcher.doc(seg_id, 0).await.unwrap().unwrap();
452        assert_eq!(doc.get_first(title).unwrap().as_text(), Some("Hello World"));
453    }
454
455    #[tokio::test]
456    async fn test_multiple_segments() {
457        let mut schema_builder = SchemaBuilder::default();
458        let title = schema_builder.add_text_field("title", true, true);
459        let schema = schema_builder.build();
460
461        let dir = RamDirectory::new();
462        let config = IndexConfig {
463            max_indexing_memory_bytes: 1024, // Very small to trigger frequent flushes
464            ..Default::default()
465        };
466
467        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
468            .await
469            .unwrap();
470
471        // Add documents in batches to create multiple segments
472        for batch in 0..3 {
473            for i in 0..5 {
474                let mut doc = Document::new();
475                doc.add_text(title, format!("Document {} batch {}", i, batch));
476                writer.add_document(doc).unwrap();
477            }
478            writer.commit().await.unwrap();
479        }
480
481        // Open and check
482        let index = Index::open(dir, config).await.unwrap();
483        assert_eq!(index.num_docs().await.unwrap(), 15);
484        // With queue-based indexing, exact segment count varies
485        assert!(
486            index.segment_readers().await.unwrap().len() >= 2,
487            "Expected multiple segments"
488        );
489    }
490
491    #[tokio::test]
492    async fn test_segment_merge() {
493        let mut schema_builder = SchemaBuilder::default();
494        let title = schema_builder.add_text_field("title", true, true);
495        let schema = schema_builder.build();
496
497        let dir = RamDirectory::new();
498        let config = IndexConfig {
499            max_indexing_memory_bytes: 512, // Very small to trigger frequent flushes
500            ..Default::default()
501        };
502
503        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
504            .await
505            .unwrap();
506
507        // Create multiple segments by flushing between batches
508        for batch in 0..3 {
509            for i in 0..3 {
510                let mut doc = Document::new();
511                doc.add_text(title, format!("Document {} batch {}", i, batch));
512                writer.add_document(doc).unwrap();
513            }
514            writer.commit().await.unwrap();
515        }
516
517        // Should have multiple segments (at least 2, one per flush with docs)
518        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
519        assert!(
520            index.segment_readers().await.unwrap().len() >= 2,
521            "Expected multiple segments"
522        );
523
524        // Force merge
525        let mut writer = IndexWriter::open(dir.clone(), config.clone())
526            .await
527            .unwrap();
528        writer.force_merge().await.unwrap();
529
530        // Should have 1 segment now
531        let index = Index::open(dir, config).await.unwrap();
532        assert_eq!(index.segment_readers().await.unwrap().len(), 1);
533        assert_eq!(index.num_docs().await.unwrap(), 9);
534
535        // Verify all documents accessible (order may vary with queue-based indexing)
536        let reader = index.reader().await.unwrap();
537        let searcher = reader.searcher().await.unwrap();
538        let seg_id = searcher.segment_readers()[0].meta().id;
539        let mut found_docs = 0;
540        for i in 0..9 {
541            if searcher.doc(seg_id, i).await.unwrap().is_some() {
542                found_docs += 1;
543            }
544        }
545        assert_eq!(found_docs, 9);
546    }
547
548    #[tokio::test]
549    async fn test_match_query() {
550        let mut schema_builder = SchemaBuilder::default();
551        let title = schema_builder.add_text_field("title", true, true);
552        let body = schema_builder.add_text_field("body", true, true);
553        let schema = schema_builder.build();
554
555        let dir = RamDirectory::new();
556        let config = IndexConfig::default();
557
558        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
559            .await
560            .unwrap();
561
562        let mut doc1 = Document::new();
563        doc1.add_text(title, "rust programming");
564        doc1.add_text(body, "Learn rust language");
565        writer.add_document(doc1).unwrap();
566
567        let mut doc2 = Document::new();
568        doc2.add_text(title, "python programming");
569        doc2.add_text(body, "Learn python language");
570        writer.add_document(doc2).unwrap();
571
572        writer.commit().await.unwrap();
573
574        let index = Index::open(dir, config).await.unwrap();
575
576        // Test match query with multiple default fields
577        let results = index.query("rust", 10).await.unwrap();
578        assert_eq!(results.hits.len(), 1);
579
580        // Test match query with multiple tokens
581        let results = index.query("rust programming", 10).await.unwrap();
582        assert!(!results.hits.is_empty());
583
584        // Verify hit has address (segment_id + doc_id)
585        let hit = &results.hits[0];
586        assert!(!hit.address.segment_id.is_empty(), "Should have segment_id");
587
588        // Verify document retrieval by address
589        let doc = index.get_document(&hit.address).await.unwrap().unwrap();
590        assert!(
591            !doc.field_values().is_empty(),
592            "Doc should have field values"
593        );
594
595        // Also verify doc retrieval via searcher snapshot
596        let reader = index.reader().await.unwrap();
597        let searcher = reader.searcher().await.unwrap();
598        let seg_id = searcher.segment_readers()[0].meta().id;
599        let doc = searcher.doc(seg_id, 0).await.unwrap().unwrap();
600        assert!(
601            !doc.field_values().is_empty(),
602            "Doc should have field values"
603        );
604    }
605
606    #[tokio::test]
607    async fn test_slice_cache_warmup_and_load() {
608        use crate::directories::SliceCachingDirectory;
609
610        let mut schema_builder = SchemaBuilder::default();
611        let title = schema_builder.add_text_field("title", true, true);
612        let body = schema_builder.add_text_field("body", true, true);
613        let schema = schema_builder.build();
614
615        let dir = RamDirectory::new();
616        let config = IndexConfig::default();
617
618        // Create index with some documents
619        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
620            .await
621            .unwrap();
622
623        for i in 0..10 {
624            let mut doc = Document::new();
625            doc.add_text(title, format!("Document {} about rust", i));
626            doc.add_text(body, format!("This is body text number {}", i));
627            writer.add_document(doc).unwrap();
628        }
629        writer.commit().await.unwrap();
630
631        // Open with slice caching and perform some operations to warm up cache
632        let caching_dir = SliceCachingDirectory::new(dir.clone(), 1024 * 1024);
633        let index = Index::open(caching_dir, config.clone()).await.unwrap();
634
635        // Perform a search to warm up the cache
636        let results = index.query("rust", 10).await.unwrap();
637        assert!(!results.hits.is_empty());
638
639        // Check cache stats - should have cached some data
640        let stats = index.directory.stats();
641        assert!(stats.total_bytes > 0, "Cache should have data after search");
642    }
643
644    #[tokio::test]
645    async fn test_multivalue_field_indexing_and_search() {
646        let mut schema_builder = SchemaBuilder::default();
647        let uris = schema_builder.add_text_field("uris", true, true);
648        let title = schema_builder.add_text_field("title", true, true);
649        let schema = schema_builder.build();
650
651        let dir = RamDirectory::new();
652        let config = IndexConfig::default();
653
654        // Create index and add document with multi-value field
655        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
656            .await
657            .unwrap();
658
659        let mut doc = Document::new();
660        doc.add_text(uris, "one");
661        doc.add_text(uris, "two");
662        doc.add_text(title, "Test Document");
663        writer.add_document(doc).unwrap();
664
665        // Add another document with different uris
666        let mut doc2 = Document::new();
667        doc2.add_text(uris, "three");
668        doc2.add_text(title, "Another Document");
669        writer.add_document(doc2).unwrap();
670
671        writer.commit().await.unwrap();
672
673        // Open for reading
674        let index = Index::open(dir, config).await.unwrap();
675        assert_eq!(index.num_docs().await.unwrap(), 2);
676
677        // Verify document retrieval preserves all values
678        let reader = index.reader().await.unwrap();
679        let searcher = reader.searcher().await.unwrap();
680        let seg_id = searcher.segment_readers()[0].meta().id;
681        let doc = searcher.doc(seg_id, 0).await.unwrap().unwrap();
682        let all_uris: Vec<_> = doc.get_all(uris).collect();
683        assert_eq!(all_uris.len(), 2, "Should have 2 uris values");
684        assert_eq!(all_uris[0].as_text(), Some("one"));
685        assert_eq!(all_uris[1].as_text(), Some("two"));
686
687        // Verify to_json returns array for multi-value field
688        let json = doc.to_json(index.schema());
689        let uris_json = json.get("uris").unwrap();
690        assert!(uris_json.is_array(), "Multi-value field should be an array");
691        let uris_arr = uris_json.as_array().unwrap();
692        assert_eq!(uris_arr.len(), 2);
693        assert_eq!(uris_arr[0].as_str(), Some("one"));
694        assert_eq!(uris_arr[1].as_str(), Some("two"));
695
696        // Verify both values are searchable
697        let results = index.query("uris:one", 10).await.unwrap();
698        assert_eq!(results.hits.len(), 1, "Should find doc with 'one'");
699        assert_eq!(results.hits[0].address.doc_id, 0);
700
701        let results = index.query("uris:two", 10).await.unwrap();
702        assert_eq!(results.hits.len(), 1, "Should find doc with 'two'");
703        assert_eq!(results.hits[0].address.doc_id, 0);
704
705        let results = index.query("uris:three", 10).await.unwrap();
706        assert_eq!(results.hits.len(), 1, "Should find doc with 'three'");
707        assert_eq!(results.hits[0].address.doc_id, 1);
708
709        // Verify searching for non-existent value returns no results
710        let results = index.query("uris:nonexistent", 10).await.unwrap();
711        assert_eq!(results.hits.len(), 0, "Should not find non-existent value");
712    }
713
714    /// Comprehensive test for MaxScore optimization in BooleanQuery OR queries
715    ///
716    /// This test verifies that:
717    /// 1. BooleanQuery with multiple SHOULD term queries uses MaxScore automatically
718    /// 2. Search results are correct regardless of MaxScore optimization
719    /// 3. Scores are reasonable for matching documents
720    #[tokio::test]
721    async fn test_maxscore_optimization_for_or_queries() {
722        use crate::query::{BooleanQuery, TermQuery};
723
724        let mut schema_builder = SchemaBuilder::default();
725        let content = schema_builder.add_text_field("content", true, true);
726        let schema = schema_builder.build();
727
728        let dir = RamDirectory::new();
729        let config = IndexConfig::default();
730
731        // Create index with documents containing various terms
732        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
733            .await
734            .unwrap();
735
736        // Doc 0: contains "rust" and "programming"
737        let mut doc = Document::new();
738        doc.add_text(content, "rust programming language is fast");
739        writer.add_document(doc).unwrap();
740
741        // Doc 1: contains "rust" only
742        let mut doc = Document::new();
743        doc.add_text(content, "rust is a systems language");
744        writer.add_document(doc).unwrap();
745
746        // Doc 2: contains "programming" only
747        let mut doc = Document::new();
748        doc.add_text(content, "programming is fun");
749        writer.add_document(doc).unwrap();
750
751        // Doc 3: contains "python" (neither rust nor programming)
752        let mut doc = Document::new();
753        doc.add_text(content, "python is easy to learn");
754        writer.add_document(doc).unwrap();
755
756        // Doc 4: contains both "rust" and "programming" multiple times
757        let mut doc = Document::new();
758        doc.add_text(content, "rust rust programming programming systems");
759        writer.add_document(doc).unwrap();
760
761        writer.commit().await.unwrap();
762
763        // Open for reading
764        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
765
766        // Test 1: Pure OR query with multiple terms (should use MaxScore automatically)
767        let or_query = BooleanQuery::new()
768            .should(TermQuery::text(content, "rust"))
769            .should(TermQuery::text(content, "programming"));
770
771        let results = index.search(&or_query, 10).await.unwrap();
772
773        // Should find docs 0, 1, 2, 4 (all that contain "rust" OR "programming")
774        assert_eq!(results.hits.len(), 4, "Should find exactly 4 documents");
775
776        let doc_ids: Vec<u32> = results.hits.iter().map(|h| h.address.doc_id).collect();
777        assert!(doc_ids.contains(&0), "Should find doc 0");
778        assert!(doc_ids.contains(&1), "Should find doc 1");
779        assert!(doc_ids.contains(&2), "Should find doc 2");
780        assert!(doc_ids.contains(&4), "Should find doc 4");
781        assert!(
782            !doc_ids.contains(&3),
783            "Should NOT find doc 3 (only has 'python')"
784        );
785
786        // Test 2: Single term query (should NOT use MaxScore, but still work)
787        let single_query = BooleanQuery::new().should(TermQuery::text(content, "rust"));
788
789        let results = index.search(&single_query, 10).await.unwrap();
790        assert_eq!(results.hits.len(), 3, "Should find 3 documents with 'rust'");
791
792        // Test 3: Query with MUST (should NOT use MaxScore)
793        let must_query = BooleanQuery::new()
794            .must(TermQuery::text(content, "rust"))
795            .should(TermQuery::text(content, "programming"));
796
797        let results = index.search(&must_query, 10).await.unwrap();
798        // Must have "rust", optionally "programming"
799        assert_eq!(results.hits.len(), 3, "Should find 3 documents with 'rust'");
800
801        // Test 4: Query with MUST_NOT (should NOT use MaxScore)
802        let must_not_query = BooleanQuery::new()
803            .should(TermQuery::text(content, "rust"))
804            .should(TermQuery::text(content, "programming"))
805            .must_not(TermQuery::text(content, "systems"));
806
807        let results = index.search(&must_not_query, 10).await.unwrap();
808        // Should exclude docs with "systems" (doc 1 and 4)
809        let doc_ids: Vec<u32> = results.hits.iter().map(|h| h.address.doc_id).collect();
810        assert!(
811            !doc_ids.contains(&1),
812            "Should NOT find doc 1 (has 'systems')"
813        );
814        assert!(
815            !doc_ids.contains(&4),
816            "Should NOT find doc 4 (has 'systems')"
817        );
818
819        // Test 5: Verify top-k limit works correctly with MaxScore
820        let or_query = BooleanQuery::new()
821            .should(TermQuery::text(content, "rust"))
822            .should(TermQuery::text(content, "programming"));
823
824        let results = index.search(&or_query, 2).await.unwrap();
825        assert_eq!(results.hits.len(), 2, "Should return only top 2 results");
826
827        // Top results should be docs that match both terms (higher scores)
828        // Doc 0 and 4 contain both "rust" and "programming"
829    }
830
831    /// Test that BooleanQuery with pure SHOULD clauses uses MaxScore and returns correct results
832    #[tokio::test]
833    async fn test_boolean_or_maxscore_optimization() {
834        use crate::query::{BooleanQuery, TermQuery};
835
836        let mut schema_builder = SchemaBuilder::default();
837        let content = schema_builder.add_text_field("content", true, true);
838        let schema = schema_builder.build();
839
840        let dir = RamDirectory::new();
841        let config = IndexConfig::default();
842
843        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
844            .await
845            .unwrap();
846
847        // Add several documents
848        for i in 0..10 {
849            let mut doc = Document::new();
850            let text = match i % 4 {
851                0 => "apple banana cherry",
852                1 => "apple orange",
853                2 => "banana grape",
854                _ => "cherry date",
855            };
856            doc.add_text(content, text);
857            writer.add_document(doc).unwrap();
858        }
859
860        writer.commit().await.unwrap();
861        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
862
863        // Pure SHOULD query — triggers MaxScore fast path
864        let query = BooleanQuery::new()
865            .should(TermQuery::text(content, "apple"))
866            .should(TermQuery::text(content, "banana"));
867
868        let results = index.search(&query, 10).await.unwrap();
869
870        // "apple" matches docs 0,1,4,5,8,9 and "banana" matches docs 0,2,4,6,8
871        // Union = {0,1,2,4,5,6,8,9} = 8 docs
872        assert_eq!(results.hits.len(), 8, "Should find all matching docs");
873    }
874
875    #[tokio::test]
876    async fn test_vector_index_threshold_switch() {
877        use crate::dsl::{DenseVectorConfig, DenseVectorQuantization, VectorIndexType};
878
879        // Create schema with dense vector field configured for IVF-RaBitQ
880        let mut schema_builder = SchemaBuilder::default();
881        let title = schema_builder.add_text_field("title", true, true);
882        let embedding = schema_builder.add_dense_vector_field_with_config(
883            "embedding",
884            true, // indexed
885            true, // stored
886            DenseVectorConfig {
887                dim: 8,
888                index_type: VectorIndexType::IvfRaBitQ,
889                quantization: DenseVectorQuantization::F32,
890                num_clusters: Some(4), // Small for test
891                nprobe: 2,
892                build_threshold: Some(50), // Build when we have 50+ vectors
893                unit_norm: false,
894            },
895        );
896        let schema = schema_builder.build();
897
898        let dir = RamDirectory::new();
899        let config = IndexConfig::default();
900
901        // Phase 1: Add vectors below threshold (should use Flat index)
902        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
903            .await
904            .unwrap();
905
906        // Add 30 documents (below threshold of 50)
907        for i in 0..30 {
908            let mut doc = Document::new();
909            doc.add_text(title, format!("Document {}", i));
910            // Simple embedding: [i, i, i, i, i, i, i, i] normalized
911            let vec: Vec<f32> = (0..8).map(|_| (i as f32) / 30.0).collect();
912            doc.add_dense_vector(embedding, vec);
913            writer.add_document(doc).unwrap();
914        }
915        writer.commit().await.unwrap();
916
917        // Open index and verify it's using Flat (not built yet)
918        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
919        assert!(
920            index.segment_manager.trained().is_none(),
921            "Should not have trained centroids below threshold"
922        );
923
924        // Search should work with Flat index
925        let query_vec: Vec<f32> = vec![0.5; 8];
926        let segments = index.segment_readers().await.unwrap();
927        assert!(!segments.is_empty());
928
929        let results = segments[0]
930            .search_dense_vector(
931                embedding,
932                &query_vec,
933                5,
934                0,
935                1.0,
936                crate::query::MultiValueCombiner::Max,
937            )
938            .await
939            .unwrap();
940        assert!(!results.is_empty(), "Flat search should return results");
941
942        // Phase 2: Add more vectors to cross threshold
943        let mut writer = IndexWriter::open(dir.clone(), config.clone())
944            .await
945            .unwrap();
946
947        // Add 30 more documents (total 60, above threshold of 50)
948        for i in 30..60 {
949            let mut doc = Document::new();
950            doc.add_text(title, format!("Document {}", i));
951            let vec: Vec<f32> = (0..8).map(|_| (i as f32) / 60.0).collect();
952            doc.add_dense_vector(embedding, vec);
953            writer.add_document(doc).unwrap();
954        }
955        writer.commit().await.unwrap();
956
957        // Manually trigger vector index build (no longer auto-triggered by commit)
958        writer.build_vector_index().await.unwrap();
959
960        // Reopen index and verify trained structures are loaded
961        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
962        assert!(
963            index.segment_manager.trained().is_some(),
964            "Should have loaded trained centroids for embedding field"
965        );
966
967        // Search should still work
968        let segments = index.segment_readers().await.unwrap();
969        let results = segments[0]
970            .search_dense_vector(
971                embedding,
972                &query_vec,
973                5,
974                0,
975                1.0,
976                crate::query::MultiValueCombiner::Max,
977            )
978            .await
979            .unwrap();
980        assert!(
981            !results.is_empty(),
982            "Search should return results after build"
983        );
984
985        // Phase 3: Verify calling build_vector_index again is a no-op
986        let writer = IndexWriter::open(dir.clone(), config.clone())
987            .await
988            .unwrap();
989        writer.build_vector_index().await.unwrap(); // Should skip training
990
991        // Still built (trained structures present in ArcSwap)
992        assert!(writer.segment_manager.trained().is_some());
993    }
994
995    /// Multi-round merge: flush many small segments, merge, add more, merge again.
996    /// Verifies search correctness (term + phrase queries) through multiple merge rounds.
997    #[tokio::test]
998    async fn test_multi_round_merge_with_search() {
999        let mut schema_builder = SchemaBuilder::default();
1000        let title = schema_builder.add_text_field("title", true, true);
1001        let body = schema_builder.add_text_field("body", true, true);
1002        let schema = schema_builder.build();
1003
1004        let dir = RamDirectory::new();
1005        let config = IndexConfig {
1006            max_indexing_memory_bytes: 512,
1007            ..Default::default()
1008        };
1009
1010        // --- Round 1: 5 segments × 10 docs = 50 docs ---
1011        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1012            .await
1013            .unwrap();
1014
1015        for batch in 0..5 {
1016            for i in 0..10 {
1017                let mut doc = Document::new();
1018                doc.add_text(
1019                    title,
1020                    format!("alpha bravo charlie batch{} doc{}", batch, i),
1021                );
1022                doc.add_text(
1023                    body,
1024                    format!("the quick brown fox jumps over the lazy dog number {}", i),
1025                );
1026                writer.add_document(doc).unwrap();
1027            }
1028            writer.commit().await.unwrap();
1029        }
1030
1031        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1032        let pre_merge_segments = index.segment_readers().await.unwrap().len();
1033        assert!(
1034            pre_merge_segments >= 3,
1035            "Expected >=3 segments, got {}",
1036            pre_merge_segments
1037        );
1038        assert_eq!(index.num_docs().await.unwrap(), 50);
1039
1040        // Search before merge
1041        let results = index.query("alpha", 100).await.unwrap();
1042        assert_eq!(results.hits.len(), 50, "all 50 docs should match 'alpha'");
1043
1044        let results = index.query("fox", 100).await.unwrap();
1045        assert_eq!(results.hits.len(), 50, "all 50 docs should match 'fox'");
1046
1047        // --- Merge round 1 ---
1048        let mut writer = IndexWriter::open(dir.clone(), config.clone())
1049            .await
1050            .unwrap();
1051        writer.force_merge().await.unwrap();
1052
1053        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1054        assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1055        assert_eq!(index.num_docs().await.unwrap(), 50);
1056
1057        // Search after first merge
1058        let results = index.query("alpha", 100).await.unwrap();
1059        assert_eq!(
1060            results.hits.len(),
1061            50,
1062            "all 50 docs should match 'alpha' after merge 1"
1063        );
1064
1065        let results = index.query("fox", 100).await.unwrap();
1066        assert_eq!(
1067            results.hits.len(),
1068            50,
1069            "all 50 docs should match 'fox' after merge 1"
1070        );
1071
1072        // Verify all docs retrievable (single merged segment)
1073        let reader1 = index.reader().await.unwrap();
1074        let searcher1 = reader1.searcher().await.unwrap();
1075        let seg_id1 = searcher1.segment_readers()[0].meta().id;
1076        for i in 0..50 {
1077            let doc = searcher1.doc(seg_id1, i).await.unwrap();
1078            assert!(doc.is_some(), "doc {} should exist after merge 1", i);
1079        }
1080
1081        // --- Round 2: add 30 more docs in 3 segments ---
1082        let mut writer = IndexWriter::open(dir.clone(), config.clone())
1083            .await
1084            .unwrap();
1085        for batch in 0..3 {
1086            for i in 0..10 {
1087                let mut doc = Document::new();
1088                doc.add_text(
1089                    title,
1090                    format!("delta echo foxtrot round2_batch{} doc{}", batch, i),
1091                );
1092                doc.add_text(
1093                    body,
1094                    format!("the quick brown fox jumps again number {}", i),
1095                );
1096                writer.add_document(doc).unwrap();
1097            }
1098            writer.commit().await.unwrap();
1099        }
1100
1101        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1102        assert_eq!(index.num_docs().await.unwrap(), 80);
1103        assert!(
1104            index.segment_readers().await.unwrap().len() >= 2,
1105            "Should have >=2 segments after round 2 ingestion"
1106        );
1107
1108        // Search spans both old merged segment and new segments
1109        let results = index.query("fox", 100).await.unwrap();
1110        assert_eq!(results.hits.len(), 80, "all 80 docs should match 'fox'");
1111
1112        let results = index.query("alpha", 100).await.unwrap();
1113        assert_eq!(results.hits.len(), 50, "only round 1 docs match 'alpha'");
1114
1115        let results = index.query("delta", 100).await.unwrap();
1116        assert_eq!(results.hits.len(), 30, "only round 2 docs match 'delta'");
1117
1118        // --- Merge round 2 ---
1119        let mut writer = IndexWriter::open(dir.clone(), config.clone())
1120            .await
1121            .unwrap();
1122        writer.force_merge().await.unwrap();
1123
1124        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1125        assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1126        assert_eq!(index.num_docs().await.unwrap(), 80);
1127
1128        // All searches still correct after second merge
1129        let results = index.query("fox", 100).await.unwrap();
1130        assert_eq!(results.hits.len(), 80, "all 80 docs after merge 2");
1131
1132        let results = index.query("alpha", 100).await.unwrap();
1133        assert_eq!(results.hits.len(), 50, "round 1 docs after merge 2");
1134
1135        let results = index.query("delta", 100).await.unwrap();
1136        assert_eq!(results.hits.len(), 30, "round 2 docs after merge 2");
1137
1138        // Verify all 80 docs retrievable (single merged segment)
1139        let reader2 = index.reader().await.unwrap();
1140        let searcher2 = reader2.searcher().await.unwrap();
1141        let seg_id2 = searcher2.segment_readers()[0].meta().id;
1142        for i in 0..80 {
1143            let doc = searcher2.doc(seg_id2, i).await.unwrap();
1144            assert!(doc.is_some(), "doc {} should exist after merge 2", i);
1145        }
1146    }
1147
1148    /// Large-scale merge: many segments with overlapping terms, verifying
1149    /// BM25 scoring and doc retrieval after merge.
1150    #[tokio::test]
1151    async fn test_large_scale_merge_correctness() {
1152        let mut schema_builder = SchemaBuilder::default();
1153        let title = schema_builder.add_text_field("title", true, true);
1154        let schema = schema_builder.build();
1155
1156        let dir = RamDirectory::new();
1157        let config = IndexConfig {
1158            max_indexing_memory_bytes: 512,
1159            ..Default::default()
1160        };
1161
1162        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1163            .await
1164            .unwrap();
1165
1166        // 8 batches × 25 docs = 200 docs total
1167        // Terms: "common" appears in all, "unique_N" appears in batch N only
1168        let total_docs = 200u32;
1169        for batch in 0..8 {
1170            for i in 0..25 {
1171                let mut doc = Document::new();
1172                doc.add_text(
1173                    title,
1174                    format!("common shared term unique_{} item{}", batch, i),
1175                );
1176                writer.add_document(doc).unwrap();
1177            }
1178            writer.commit().await.unwrap();
1179        }
1180
1181        // Verify pre-merge
1182        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1183        assert_eq!(index.num_docs().await.unwrap(), total_docs);
1184
1185        let results = index.query("common", 300).await.unwrap();
1186        assert_eq!(
1187            results.hits.len(),
1188            total_docs as usize,
1189            "all docs should match 'common'"
1190        );
1191
1192        // Each unique_N matches exactly 25 docs
1193        for batch in 0..8 {
1194            let q = format!("unique_{}", batch);
1195            let results = index.query(&q, 100).await.unwrap();
1196            assert_eq!(results.hits.len(), 25, "'{}' should match 25 docs", q);
1197        }
1198
1199        // Force merge
1200        let mut writer = IndexWriter::open(dir.clone(), config.clone())
1201            .await
1202            .unwrap();
1203        writer.force_merge().await.unwrap();
1204
1205        // Verify post-merge: single segment, same results
1206        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1207        assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1208        assert_eq!(index.num_docs().await.unwrap(), total_docs);
1209
1210        let results = index.query("common", 300).await.unwrap();
1211        assert_eq!(results.hits.len(), total_docs as usize);
1212
1213        for batch in 0..8 {
1214            let q = format!("unique_{}", batch);
1215            let results = index.query(&q, 100).await.unwrap();
1216            assert_eq!(results.hits.len(), 25, "'{}' after merge", q);
1217        }
1218
1219        // Verify doc retrieval for every doc
1220        let reader = index.reader().await.unwrap();
1221        let searcher = reader.searcher().await.unwrap();
1222        let seg_id = searcher.segment_readers()[0].meta().id;
1223        for i in 0..total_docs {
1224            let doc = searcher.doc(seg_id, i).await.unwrap();
1225            assert!(doc.is_some(), "doc {} missing after merge", i);
1226        }
1227    }
1228
1229    /// Test that auto-merge is triggered by the merge policy during commit,
1230    /// without calling force_merge. Uses MmapDirectory and higher parallelism
1231    /// to reproduce production conditions.
1232    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
1233    async fn test_auto_merge_triggered() {
1234        use crate::directories::MmapDirectory;
1235        let tmp_dir = tempfile::tempdir().unwrap();
1236        let dir = MmapDirectory::new(tmp_dir.path());
1237
1238        let mut schema_builder = SchemaBuilder::default();
1239        let title = schema_builder.add_text_field("title", true, true);
1240        let body = schema_builder.add_text_field("body", true, true);
1241        let schema = schema_builder.build();
1242
1243        // Aggressive policy: merge when 3 segments in same tier
1244        let config = IndexConfig {
1245            max_indexing_memory_bytes: 4096,
1246            num_indexing_threads: 4,
1247            merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1248            ..Default::default()
1249        };
1250
1251        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1252            .await
1253            .unwrap();
1254
1255        // Create 12 segments with ~50 docs each (4x the aggressive threshold of 3)
1256        for batch in 0..12 {
1257            for i in 0..50 {
1258                let mut doc = Document::new();
1259                doc.add_text(title, format!("document_{} batch_{} alpha bravo", i, batch));
1260                doc.add_text(
1261                    body,
1262                    format!(
1263                        "the quick brown fox jumps over lazy dog number {} round {}",
1264                        i, batch
1265                    ),
1266                );
1267                writer.add_document(doc).unwrap();
1268            }
1269            writer.commit().await.unwrap();
1270        }
1271
1272        let pre_merge = writer.segment_manager.get_segment_ids().await.len();
1273
1274        // wait_for_merging_thread waits for the single in-flight merge. After it completes,
1275        // re-evaluate since segments accumulated while the merge was running.
1276        writer.wait_for_merging_thread().await;
1277        writer.maybe_merge().await;
1278        writer.wait_for_merging_thread().await;
1279
1280        // After commit + auto-merge, segment count should be reduced
1281        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1282        let segment_count = index.segment_readers().await.unwrap().len();
1283        eprintln!(
1284            "Segments: {} before merge, {} after auto-merge",
1285            pre_merge, segment_count
1286        );
1287        assert!(
1288            segment_count < pre_merge,
1289            "Expected auto-merge to reduce segments from {}, got {}",
1290            pre_merge,
1291            segment_count
1292        );
1293    }
1294
1295    /// Regression test: commit with dense vector fields + aggressive merge policy.
1296    /// Exercises the race where background merge deletes segment files while
1297    /// maybe_build_vector_index → collect_vectors_for_training tries to open them.
1298    /// Before the fix, this would fail with "IO error: No such file or directory".
1299    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
1300    async fn test_commit_with_vectors_and_background_merge() {
1301        use crate::directories::MmapDirectory;
1302        use crate::dsl::DenseVectorConfig;
1303
1304        let tmp_dir = tempfile::tempdir().unwrap();
1305        let dir = MmapDirectory::new(tmp_dir.path());
1306
1307        let mut schema_builder = SchemaBuilder::default();
1308        let title = schema_builder.add_text_field("title", true, true);
1309        // RaBitQ with very low build_threshold so vector index building triggers during commit
1310        let vec_config = DenseVectorConfig::new(8).with_build_threshold(10);
1311        let embedding =
1312            schema_builder.add_dense_vector_field_with_config("embedding", true, true, vec_config);
1313        let schema = schema_builder.build();
1314
1315        // Aggressive merge: triggers background merges at 3 segments per tier
1316        let config = IndexConfig {
1317            max_indexing_memory_bytes: 4096,
1318            num_indexing_threads: 4,
1319            merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1320            ..Default::default()
1321        };
1322
1323        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1324            .await
1325            .unwrap();
1326
1327        // Create 12 segments with vectors — enough to trigger both
1328        // background merges (aggressive policy) and vector index building (threshold=10)
1329        for batch in 0..12 {
1330            for i in 0..5 {
1331                let mut doc = Document::new();
1332                doc.add_text(title, format!("doc_{}_batch_{}", i, batch));
1333                // 8-dim random vector
1334                let vec: Vec<f32> = (0..8).map(|j| (i * 8 + j + batch) as f32 * 0.1).collect();
1335                doc.add_dense_vector(embedding, vec);
1336                writer.add_document(doc).unwrap();
1337            }
1338            writer.commit().await.unwrap();
1339        }
1340        writer.wait_for_merging_thread().await;
1341
1342        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1343        let num_docs = index.num_docs().await.unwrap();
1344        assert_eq!(num_docs, 60, "Expected 60 docs, got {}", num_docs);
1345    }
1346
1347    /// Stress test: force_merge with many segments (iterative batching).
1348    /// Verifies that merging 50 segments doesn't OOM or exhaust file descriptors.
1349    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1350    async fn test_force_merge_many_segments() {
1351        use crate::directories::MmapDirectory;
1352        let tmp_dir = tempfile::tempdir().unwrap();
1353        let dir = MmapDirectory::new(tmp_dir.path());
1354
1355        let mut schema_builder = SchemaBuilder::default();
1356        let title = schema_builder.add_text_field("title", true, true);
1357        let schema = schema_builder.build();
1358
1359        let config = IndexConfig {
1360            max_indexing_memory_bytes: 512,
1361            ..Default::default()
1362        };
1363
1364        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1365            .await
1366            .unwrap();
1367
1368        // Create 50 tiny segments
1369        for batch in 0..50 {
1370            for i in 0..3 {
1371                let mut doc = Document::new();
1372                doc.add_text(title, format!("term_{} batch_{}", i, batch));
1373                writer.add_document(doc).unwrap();
1374            }
1375            writer.commit().await.unwrap();
1376        }
1377        // Wait for background merges before reading segment count
1378        writer.wait_for_merging_thread().await;
1379
1380        let seg_ids = writer.segment_manager.get_segment_ids().await;
1381        let pre = seg_ids.len();
1382        eprintln!("Segments before force_merge: {}", pre);
1383        assert!(pre >= 2, "Expected multiple segments, got {}", pre);
1384
1385        // Force merge all into one — should iterate in batches, not OOM
1386        writer.force_merge().await.unwrap();
1387
1388        let index2 = Index::open(dir, config).await.unwrap();
1389        let post = index2.segment_readers().await.unwrap().len();
1390        eprintln!("Segments after force_merge: {}", post);
1391        assert_eq!(post, 1);
1392        assert_eq!(index2.num_docs().await.unwrap(), 150);
1393    }
1394
1395    /// Test that background merges produce correct generation metadata.
1396    /// Creates many segments with aggressive policy, commits, waits for merges,
1397    /// and verifies that merged segments have generation >= 1 with correct ancestors.
1398    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1399    async fn test_background_merge_generation() {
1400        use crate::directories::MmapDirectory;
1401        let tmp_dir = tempfile::tempdir().unwrap();
1402        let dir = MmapDirectory::new(tmp_dir.path());
1403
1404        let mut schema_builder = SchemaBuilder::default();
1405        let title = schema_builder.add_text_field("title", true, true);
1406        let schema = schema_builder.build();
1407
1408        let config = IndexConfig {
1409            max_indexing_memory_bytes: 4096,
1410            num_indexing_threads: 2,
1411            merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1412            ..Default::default()
1413        };
1414
1415        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1416            .await
1417            .unwrap();
1418
1419        // Create 15 small segments — enough for aggressive policy to trigger merges
1420        for batch in 0..15 {
1421            for i in 0..5 {
1422                let mut doc = Document::new();
1423                doc.add_text(title, format!("doc_{}_batch_{}", i, batch));
1424                writer.add_document(doc).unwrap();
1425            }
1426            writer.commit().await.unwrap();
1427        }
1428        writer.wait_for_merging_thread().await;
1429
1430        // Read metadata and verify generation tracking
1431        let metas = writer
1432            .segment_manager
1433            .read_metadata(|m| m.segment_metas.clone())
1434            .await;
1435
1436        let max_gen = metas.values().map(|m| m.generation).max().unwrap_or(0);
1437        eprintln!(
1438            "Segments after merge: {}, max generation: {}",
1439            metas.len(),
1440            max_gen
1441        );
1442
1443        // Background merges should have produced at least one merged segment (gen >= 1)
1444        assert!(
1445            max_gen >= 1,
1446            "Expected at least one merged segment (gen >= 1), got max_gen={}",
1447            max_gen
1448        );
1449
1450        // Every merged segment (gen > 0) must have non-empty ancestors
1451        for (id, info) in &metas {
1452            if info.generation > 0 {
1453                assert!(
1454                    !info.ancestors.is_empty(),
1455                    "Segment {} has gen={} but no ancestors",
1456                    id,
1457                    info.generation
1458                );
1459            } else {
1460                assert!(
1461                    info.ancestors.is_empty(),
1462                    "Fresh segment {} has gen=0 but has ancestors",
1463                    id
1464                );
1465            }
1466        }
1467    }
1468
1469    /// Test that merging preserves every single document.
1470    /// Indexes 1000+ unique documents across many segments, force-merges,
1471    /// and verifies exact doc count and that every unique term is searchable.
1472    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1473    async fn test_merge_preserves_all_documents() {
1474        use crate::directories::MmapDirectory;
1475        let tmp_dir = tempfile::tempdir().unwrap();
1476        let dir = MmapDirectory::new(tmp_dir.path());
1477
1478        let mut schema_builder = SchemaBuilder::default();
1479        let title = schema_builder.add_text_field("title", true, true);
1480        let schema = schema_builder.build();
1481
1482        let config = IndexConfig {
1483            max_indexing_memory_bytes: 4096,
1484            ..Default::default()
1485        };
1486
1487        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1488            .await
1489            .unwrap();
1490
1491        let total_docs = 1200;
1492        let docs_per_batch = 60;
1493        let batches = total_docs / docs_per_batch;
1494
1495        // Each doc has a unique term "uid_N" for verification
1496        for batch in 0..batches {
1497            for i in 0..docs_per_batch {
1498                let doc_num = batch * docs_per_batch + i;
1499                let mut doc = Document::new();
1500                doc.add_text(
1501                    title,
1502                    format!("uid_{} common_term batch_{}", doc_num, batch),
1503                );
1504                writer.add_document(doc).unwrap();
1505            }
1506            writer.commit().await.unwrap();
1507        }
1508
1509        let pre_segments = writer.segment_manager.get_segment_ids().await.len();
1510        assert!(
1511            pre_segments >= 2,
1512            "Need multiple segments, got {}",
1513            pre_segments
1514        );
1515
1516        // Force merge to single segment
1517        writer.force_merge().await.unwrap();
1518
1519        let index = Index::open(dir, config).await.unwrap();
1520        assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1521        assert_eq!(
1522            index.num_docs().await.unwrap(),
1523            total_docs as u32,
1524            "Doc count mismatch after force_merge"
1525        );
1526
1527        // Verify every unique document is searchable
1528        let results = index.query("common_term", total_docs + 100).await.unwrap();
1529        assert_eq!(
1530            results.hits.len(),
1531            total_docs,
1532            "common_term should match all docs"
1533        );
1534
1535        // Spot-check unique IDs across the range
1536        for check in [0, 1, total_docs / 2, total_docs - 1] {
1537            let q = format!("uid_{}", check);
1538            let results = index.query(&q, 10).await.unwrap();
1539            assert_eq!(results.hits.len(), 1, "'{}' should match exactly 1 doc", q);
1540        }
1541    }
1542
1543    /// Multi-round commit+merge: verify doc count grows correctly
1544    /// and no documents are lost across multiple merge cycles.
1545    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1546    async fn test_multi_round_merge_doc_integrity() {
1547        use crate::directories::MmapDirectory;
1548        let tmp_dir = tempfile::tempdir().unwrap();
1549        let dir = MmapDirectory::new(tmp_dir.path());
1550
1551        let mut schema_builder = SchemaBuilder::default();
1552        let title = schema_builder.add_text_field("title", true, true);
1553        let schema = schema_builder.build();
1554
1555        let config = IndexConfig {
1556            max_indexing_memory_bytes: 4096,
1557            num_indexing_threads: 2,
1558            merge_policy: Box::new(crate::merge::TieredMergePolicy::aggressive()),
1559            ..Default::default()
1560        };
1561
1562        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1563            .await
1564            .unwrap();
1565
1566        let mut expected_total = 0u64;
1567
1568        // 4 rounds of: add docs → commit → wait for merges → verify count
1569        for round in 0..4 {
1570            let docs_this_round = 50 + round * 25; // 50, 75, 100, 125
1571            for batch in 0..5 {
1572                for i in 0..docs_this_round / 5 {
1573                    let mut doc = Document::new();
1574                    doc.add_text(
1575                        title,
1576                        format!("round_{}_batch_{}_doc_{} searchable", round, batch, i),
1577                    );
1578                    writer.add_document(doc).unwrap();
1579                }
1580                writer.commit().await.unwrap();
1581            }
1582            writer.wait_for_merging_thread().await;
1583
1584            expected_total += docs_this_round as u64;
1585
1586            let actual = writer
1587                .segment_manager
1588                .read_metadata(|m| {
1589                    m.segment_metas
1590                        .values()
1591                        .map(|s| s.num_docs as u64)
1592                        .sum::<u64>()
1593                })
1594                .await;
1595
1596            assert_eq!(
1597                actual, expected_total,
1598                "Round {}: expected {} docs, metadata reports {}",
1599                round, expected_total, actual
1600            );
1601        }
1602
1603        // Final verify: open fresh and query
1604        let index = Index::open(dir, config).await.unwrap();
1605        assert_eq!(index.num_docs().await.unwrap(), expected_total as u32);
1606
1607        let results = index
1608            .query("searchable", expected_total as usize + 100)
1609            .await
1610            .unwrap();
1611        assert_eq!(
1612            results.hits.len(),
1613            expected_total as usize,
1614            "All docs should match 'searchable'"
1615        );
1616
1617        // Check generation grew across rounds
1618        let metas = index
1619            .segment_manager()
1620            .read_metadata(|m| m.segment_metas.clone())
1621            .await;
1622        let max_gen = metas.values().map(|m| m.generation).max().unwrap_or(0);
1623        eprintln!(
1624            "Final: {} segments, {} docs, max generation={}",
1625            metas.len(),
1626            expected_total,
1627            max_gen
1628        );
1629        assert!(
1630            max_gen >= 1,
1631            "Multiple merge rounds should produce gen >= 1"
1632        );
1633    }
1634
1635    /// Sustained indexing: verify segment count stays O(logN) bounded.
1636    ///
1637    /// Indexes many small batches with aggressive merge policy and checks that
1638    /// the segment count never grows unbounded. With tiered merging the count
1639    /// should stay roughly O(segments_per_tier * num_tiers) ≈ O(log(N)).
1640    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1641    async fn test_segment_count_bounded_during_sustained_indexing() {
1642        use crate::directories::MmapDirectory;
1643        let tmp_dir = tempfile::tempdir().unwrap();
1644        let dir = MmapDirectory::new(tmp_dir.path());
1645
1646        let mut schema_builder = SchemaBuilder::default();
1647        let title = schema_builder.add_text_field("title", true, false);
1648        let schema = schema_builder.build();
1649
1650        let policy = crate::merge::TieredMergePolicy {
1651            segments_per_tier: 3,
1652            max_merge_at_once: 5,
1653            tier_factor: 10.0,
1654            tier_floor: 50,
1655            max_merged_docs: 1_000_000,
1656        };
1657
1658        let config = IndexConfig {
1659            max_indexing_memory_bytes: 4096, // tiny budget → frequent flushes
1660            num_indexing_threads: 1,
1661            merge_policy: Box::new(policy),
1662            max_concurrent_merges: 4,
1663            ..Default::default()
1664        };
1665
1666        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1667            .await
1668            .unwrap();
1669
1670        let num_commits = 40;
1671        let docs_per_commit = 30;
1672        let total_docs = num_commits * docs_per_commit;
1673        let mut max_segments_seen = 0usize;
1674
1675        for commit_idx in 0..num_commits {
1676            for i in 0..docs_per_commit {
1677                let mut doc = Document::new();
1678                doc.add_text(
1679                    title,
1680                    format!("doc_{} text", commit_idx * docs_per_commit + i),
1681                );
1682                writer.add_document(doc).unwrap();
1683            }
1684            writer.commit().await.unwrap();
1685
1686            // Give background merges a moment to run
1687            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1688
1689            let seg_count = writer.segment_manager.get_segment_ids().await.len();
1690            max_segments_seen = max_segments_seen.max(seg_count);
1691        }
1692
1693        // Wait for all merges to finish
1694        writer.wait_for_all_merges().await;
1695
1696        let final_segments = writer.segment_manager.get_segment_ids().await.len();
1697        let final_docs: u64 = writer
1698            .segment_manager
1699            .read_metadata(|m| {
1700                m.segment_metas
1701                    .values()
1702                    .map(|s| s.num_docs as u64)
1703                    .sum::<u64>()
1704            })
1705            .await;
1706
1707        eprintln!(
1708            "Sustained indexing: {} commits, {} total docs, final segments={}, max segments seen={}",
1709            num_commits, total_docs, final_segments, max_segments_seen
1710        );
1711
1712        // With 1200 docs and segments_per_tier=3, tier_floor=50:
1713        // tier 0: ≤50 docs, tier 1: 50-500, tier 2: 500-5000
1714        // We should have at most ~3 segments per tier * ~3 tiers ≈ 9-12 segments at peak.
1715        // The key invariant: segment count must NOT grow linearly with commits.
1716        // 40 commits should NOT produce 40 segments.
1717        let max_allowed = num_commits / 2; // generous: at most half the commits as segments
1718        assert!(
1719            max_segments_seen <= max_allowed,
1720            "Segment count grew too fast: max seen {} > allowed {} (out of {} commits). \
1721             Merging is not keeping up.",
1722            max_segments_seen,
1723            max_allowed,
1724            num_commits
1725        );
1726
1727        // After all merges complete, should be well under the limit
1728        assert!(
1729            final_segments <= 10,
1730            "After all merges, expected ≤10 segments, got {}",
1731            final_segments
1732        );
1733
1734        // No data loss
1735        assert_eq!(
1736            final_docs, total_docs as u64,
1737            "Expected {} docs, metadata reports {}",
1738            total_docs, final_docs
1739        );
1740    }
1741
1742    // ========================================================================
1743    // Needle-in-haystack comprehensive tests
1744    // ========================================================================
1745
1746    /// Full-text needle-in-haystack: one unique term among many documents.
1747    /// Verifies exact retrieval, scoring, and document content after commit + reopen.
1748    #[tokio::test]
1749    async fn test_needle_fulltext_single_segment() {
1750        let mut sb = SchemaBuilder::default();
1751        let title = sb.add_text_field("title", true, true);
1752        let body = sb.add_text_field("body", true, true);
1753        let schema = sb.build();
1754
1755        let dir = RamDirectory::new();
1756        let config = IndexConfig::default();
1757        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1758            .await
1759            .unwrap();
1760
1761        // 100 hay documents
1762        for i in 0..100 {
1763            let mut doc = Document::new();
1764            doc.add_text(title, format!("Hay document number {}", i));
1765            doc.add_text(
1766                body,
1767                "common words repeated across all hay documents filler text",
1768            );
1769            writer.add_document(doc).unwrap();
1770        }
1771
1772        // 1 needle document (doc 100)
1773        let mut needle = Document::new();
1774        needle.add_text(title, "The unique needle xylophone");
1775        needle.add_text(
1776            body,
1777            "This document contains the extraordinary term xylophone",
1778        );
1779        // Insert needle among hay by re-adding remaining hay after it
1780        // Actually, we already added 100, so needle is doc 100
1781        writer.add_document(needle).unwrap();
1782
1783        // 50 more hay documents after needle
1784        for i in 100..150 {
1785            let mut doc = Document::new();
1786            doc.add_text(title, format!("More hay document {}", i));
1787            doc.add_text(body, "common words filler text again and again");
1788            writer.add_document(doc).unwrap();
1789        }
1790
1791        writer.commit().await.unwrap();
1792
1793        let index = Index::open(dir, config).await.unwrap();
1794        assert_eq!(index.num_docs().await.unwrap(), 151);
1795
1796        // Search for the needle term
1797        let results = index.query("xylophone", 10).await.unwrap();
1798        assert_eq!(results.hits.len(), 1, "Should find exactly the needle");
1799        assert!(results.hits[0].score > 0.0, "Score should be positive");
1800
1801        // Verify document content
1802        let doc = index
1803            .get_document(&results.hits[0].address)
1804            .await
1805            .unwrap()
1806            .unwrap();
1807        let title_val = doc.get_first(title).unwrap().as_text().unwrap();
1808        assert!(
1809            title_val.contains("xylophone"),
1810            "Retrieved doc should be the needle"
1811        );
1812
1813        // Search for common term — should return many
1814        let results = index.query("common", 200).await.unwrap();
1815        assert!(
1816            results.hits.len() >= 100,
1817            "Common term should match many docs"
1818        );
1819
1820        // Negative test — term that doesn't exist
1821        let results = index.query("nonexistentterm99999", 10).await.unwrap();
1822        assert_eq!(
1823            results.hits.len(),
1824            0,
1825            "Non-existent term should match nothing"
1826        );
1827    }
1828
1829    /// Full-text needle across multiple segments: ensures cross-segment search works.
1830    #[tokio::test]
1831    async fn test_needle_fulltext_multi_segment() {
1832        use crate::query::TermQuery;
1833
1834        let mut sb = SchemaBuilder::default();
1835        let content = sb.add_text_field("content", true, true);
1836        let schema = sb.build();
1837
1838        let dir = RamDirectory::new();
1839        let config = IndexConfig::default();
1840        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1841            .await
1842            .unwrap();
1843
1844        // Segment 1: 50 hay docs
1845        for i in 0..50 {
1846            let mut doc = Document::new();
1847            doc.add_text(content, format!("segment one hay document {}", i));
1848            writer.add_document(doc).unwrap();
1849        }
1850        writer.commit().await.unwrap();
1851
1852        // Segment 2: needle + 49 hay docs
1853        let mut needle = Document::new();
1854        needle.add_text(content, "the magnificent quetzalcoatl serpent deity");
1855        writer.add_document(needle).unwrap();
1856        for i in 0..49 {
1857            let mut doc = Document::new();
1858            doc.add_text(content, format!("segment two hay document {}", i));
1859            writer.add_document(doc).unwrap();
1860        }
1861        writer.commit().await.unwrap();
1862
1863        // Segment 3: 50 more hay docs
1864        for i in 0..50 {
1865            let mut doc = Document::new();
1866            doc.add_text(content, format!("segment three hay document {}", i));
1867            writer.add_document(doc).unwrap();
1868        }
1869        writer.commit().await.unwrap();
1870
1871        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1872        assert_eq!(index.num_docs().await.unwrap(), 150);
1873        let num_segments = index.segment_readers().await.unwrap().len();
1874        assert!(
1875            num_segments >= 2,
1876            "Should have multiple segments, got {}",
1877            num_segments
1878        );
1879
1880        // Find needle across segments
1881        let results = index.query("quetzalcoatl", 10).await.unwrap();
1882        assert_eq!(
1883            results.hits.len(),
1884            1,
1885            "Should find exactly 1 needle across segments"
1886        );
1887
1888        // Verify using TermQuery directly
1889        let reader = index.reader().await.unwrap();
1890        let searcher = reader.searcher().await.unwrap();
1891        let tq = TermQuery::text(content, "quetzalcoatl");
1892        let results = searcher.search(&tq, 10).await.unwrap();
1893        assert_eq!(results.len(), 1, "TermQuery should also find the needle");
1894
1895        // Verify content
1896        let doc = searcher
1897            .doc(results[0].segment_id, results[0].doc_id)
1898            .await
1899            .unwrap()
1900            .unwrap();
1901        let text = doc.get_first(content).unwrap().as_text().unwrap();
1902        assert!(
1903            text.contains("quetzalcoatl"),
1904            "Should retrieve needle content"
1905        );
1906
1907        // Cross-segment term that exists in all segments
1908        let results = index.query("document", 200).await.unwrap();
1909        assert!(
1910            results.hits.len() >= 149,
1911            "Should find hay docs across all segments"
1912        );
1913    }
1914
1915    /// Sparse vector needle-in-haystack: one document with unique dimensions.
1916    #[tokio::test]
1917    async fn test_needle_sparse_vector() {
1918        use crate::query::SparseVectorQuery;
1919
1920        let mut sb = SchemaBuilder::default();
1921        let title = sb.add_text_field("title", true, true);
1922        let sparse = sb.add_sparse_vector_field("sparse", true, true);
1923        let schema = sb.build();
1924
1925        let dir = RamDirectory::new();
1926        let config = IndexConfig::default();
1927        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1928            .await
1929            .unwrap();
1930
1931        // 100 hay documents with sparse vectors on dimensions 0-9
1932        for i in 0..100 {
1933            let mut doc = Document::new();
1934            doc.add_text(title, format!("Hay sparse doc {}", i));
1935            // All hay docs share dimensions 0-9 with varying weights
1936            let entries: Vec<(u32, f32)> = (0..10)
1937                .map(|d| (d, 0.1 + (i as f32 * 0.001) + (d as f32 * 0.01)))
1938                .collect();
1939            doc.add_sparse_vector(sparse, entries);
1940            writer.add_document(doc).unwrap();
1941        }
1942
1943        // Needle: unique dimensions 1000, 1001, 1002 (no other doc has these)
1944        let mut needle = Document::new();
1945        needle.add_text(title, "Needle sparse document");
1946        needle.add_sparse_vector(
1947            sparse,
1948            vec![(1000, 0.9), (1001, 0.8), (1002, 0.7), (5, 0.3)],
1949        );
1950        writer.add_document(needle).unwrap();
1951
1952        // 50 more hay docs
1953        for i in 100..150 {
1954            let mut doc = Document::new();
1955            doc.add_text(title, format!("More hay sparse doc {}", i));
1956            let entries: Vec<(u32, f32)> = (0..10).map(|d| (d, 0.2 + (d as f32 * 0.02))).collect();
1957            doc.add_sparse_vector(sparse, entries);
1958            writer.add_document(doc).unwrap();
1959        }
1960
1961        writer.commit().await.unwrap();
1962
1963        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1964        assert_eq!(index.num_docs().await.unwrap(), 151);
1965
1966        // Query with needle's unique dimensions
1967        let reader = index.reader().await.unwrap();
1968        let searcher = reader.searcher().await.unwrap();
1969        let query = SparseVectorQuery::new(sparse, vec![(1000, 1.0), (1001, 1.0), (1002, 1.0)]);
1970        let results = searcher.search(&query, 10).await.unwrap();
1971        assert_eq!(results.len(), 1, "Only needle has dims 1000-1002");
1972        assert!(results[0].score > 0.0, "Needle score should be positive");
1973
1974        // Verify it's the right document
1975        let doc = searcher
1976            .doc(results[0].segment_id, results[0].doc_id)
1977            .await
1978            .unwrap()
1979            .unwrap();
1980        let title_val = doc.get_first(title).unwrap().as_text().unwrap();
1981        assert_eq!(title_val, "Needle sparse document");
1982
1983        // Query with shared dimension — should match many
1984        let query_shared = SparseVectorQuery::new(sparse, vec![(5, 1.0)]);
1985        let results = searcher.search(&query_shared, 200).await.unwrap();
1986        assert!(
1987            results.len() >= 100,
1988            "Shared dim 5 should match many docs, got {}",
1989            results.len()
1990        );
1991
1992        // Query with non-existent dimension — should match nothing
1993        let query_missing = SparseVectorQuery::new(sparse, vec![(99999, 1.0)]);
1994        let results = searcher.search(&query_missing, 10).await.unwrap();
1995        assert_eq!(
1996            results.len(),
1997            0,
1998            "Non-existent dimension should match nothing"
1999        );
2000    }
2001
2002    /// Sparse vector needle across multiple segments with merge.
2003    #[tokio::test]
2004    async fn test_needle_sparse_vector_multi_segment_merge() {
2005        use crate::query::SparseVectorQuery;
2006
2007        let mut sb = SchemaBuilder::default();
2008        let title = sb.add_text_field("title", true, true);
2009        let sparse = sb.add_sparse_vector_field("sparse", true, true);
2010        let schema = sb.build();
2011
2012        let dir = RamDirectory::new();
2013        let config = IndexConfig::default();
2014        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
2015            .await
2016            .unwrap();
2017
2018        // Segment 1: hay
2019        for i in 0..30 {
2020            let mut doc = Document::new();
2021            doc.add_text(title, format!("seg1 hay {}", i));
2022            doc.add_sparse_vector(sparse, vec![(0, 0.5), (1, 0.3)]);
2023            writer.add_document(doc).unwrap();
2024        }
2025        writer.commit().await.unwrap();
2026
2027        // Segment 2: needle + hay
2028        let mut needle = Document::new();
2029        needle.add_text(title, "seg2 needle");
2030        needle.add_sparse_vector(sparse, vec![(500, 0.95), (501, 0.85)]);
2031        writer.add_document(needle).unwrap();
2032        for i in 0..29 {
2033            let mut doc = Document::new();
2034            doc.add_text(title, format!("seg2 hay {}", i));
2035            doc.add_sparse_vector(sparse, vec![(0, 0.4), (2, 0.6)]);
2036            writer.add_document(doc).unwrap();
2037        }
2038        writer.commit().await.unwrap();
2039
2040        // Verify pre-merge
2041        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
2042        assert_eq!(index.num_docs().await.unwrap(), 60);
2043
2044        let reader = index.reader().await.unwrap();
2045        let searcher = reader.searcher().await.unwrap();
2046        let query = SparseVectorQuery::new(sparse, vec![(500, 1.0), (501, 1.0)]);
2047        let results = searcher.search(&query, 10).await.unwrap();
2048        assert_eq!(results.len(), 1, "Pre-merge: needle should be found");
2049        let doc = searcher
2050            .doc(results[0].segment_id, results[0].doc_id)
2051            .await
2052            .unwrap()
2053            .unwrap();
2054        assert_eq!(
2055            doc.get_first(title).unwrap().as_text().unwrap(),
2056            "seg2 needle"
2057        );
2058
2059        // Force merge
2060        let mut writer = IndexWriter::open(dir.clone(), config.clone())
2061            .await
2062            .unwrap();
2063        writer.force_merge().await.unwrap();
2064
2065        // Verify post-merge
2066        let index = Index::open(dir, config).await.unwrap();
2067        assert_eq!(index.segment_readers().await.unwrap().len(), 1);
2068        assert_eq!(index.num_docs().await.unwrap(), 60);
2069
2070        let reader = index.reader().await.unwrap();
2071        let searcher = reader.searcher().await.unwrap();
2072        let query = SparseVectorQuery::new(sparse, vec![(500, 1.0), (501, 1.0)]);
2073        let results = searcher.search(&query, 10).await.unwrap();
2074        assert_eq!(results.len(), 1, "Post-merge: needle should still be found");
2075        let doc = searcher
2076            .doc(results[0].segment_id, results[0].doc_id)
2077            .await
2078            .unwrap()
2079            .unwrap();
2080        assert_eq!(
2081            doc.get_first(title).unwrap().as_text().unwrap(),
2082            "seg2 needle"
2083        );
2084    }
2085
2086    /// Dense vector needle-in-haystack using brute-force (Flat) search.
2087    #[tokio::test]
2088    async fn test_needle_dense_vector_flat() {
2089        use crate::dsl::{DenseVectorConfig, VectorIndexType};
2090        use crate::query::DenseVectorQuery;
2091
2092        let dim = 16;
2093        let mut sb = SchemaBuilder::default();
2094        let title = sb.add_text_field("title", true, true);
2095        let embedding = sb.add_dense_vector_field_with_config(
2096            "embedding",
2097            true,
2098            true,
2099            DenseVectorConfig {
2100                dim,
2101                index_type: VectorIndexType::Flat,
2102                quantization: crate::dsl::DenseVectorQuantization::F32,
2103                num_clusters: None,
2104                nprobe: 0,
2105                build_threshold: None,
2106                unit_norm: false,
2107            },
2108        );
2109        let schema = sb.build();
2110
2111        let dir = RamDirectory::new();
2112        let config = IndexConfig::default();
2113        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
2114            .await
2115            .unwrap();
2116
2117        // 100 hay docs: vectors near origin (small random-ish values)
2118        for i in 0..100 {
2119            let mut doc = Document::new();
2120            doc.add_text(title, format!("Hay dense doc {}", i));
2121            // Hay vectors: low-magnitude, varying direction
2122            let vec: Vec<f32> = (0..dim)
2123                .map(|d| ((i * 7 + d * 13) % 100) as f32 / 1000.0)
2124                .collect();
2125            doc.add_dense_vector(embedding, vec);
2126            writer.add_document(doc).unwrap();
2127        }
2128
2129        // Needle: vector pointing strongly in one direction [1,1,1,...,1]
2130        let mut needle = Document::new();
2131        needle.add_text(title, "Needle dense document");
2132        let needle_vec: Vec<f32> = vec![1.0; dim];
2133        needle.add_dense_vector(embedding, needle_vec.clone());
2134        writer.add_document(needle).unwrap();
2135
2136        writer.commit().await.unwrap();
2137
2138        let index = Index::open(dir, config).await.unwrap();
2139        assert_eq!(index.num_docs().await.unwrap(), 101);
2140
2141        // Query with the needle vector — it should be the top result
2142        let reader = index.reader().await.unwrap();
2143        let searcher = reader.searcher().await.unwrap();
2144        let query = DenseVectorQuery::new(embedding, needle_vec);
2145        let results = searcher.search(&query, 5).await.unwrap();
2146        assert!(!results.is_empty(), "Should find at least 1 result");
2147
2148        // The needle (exact match) should be the top result with highest score
2149        let top_doc = searcher
2150            .doc(results[0].segment_id, results[0].doc_id)
2151            .await
2152            .unwrap()
2153            .unwrap();
2154        let top_title = top_doc.get_first(title).unwrap().as_text().unwrap();
2155        assert_eq!(
2156            top_title, "Needle dense document",
2157            "Top result should be the needle (exact vector match)"
2158        );
2159        assert!(
2160            results[0].score > 0.9,
2161            "Exact match should have very high cosine similarity, got {}",
2162            results[0].score
2163        );
2164    }
2165
2166    /// Combined: full-text + sparse + dense in the same index.
2167    /// Verifies all three retrieval paths work independently on the same dataset.
2168    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
2169    async fn test_needle_combined_all_modalities() {
2170        use crate::directories::MmapDirectory;
2171        use crate::dsl::{DenseVectorConfig, VectorIndexType};
2172        use crate::query::{DenseVectorQuery, SparseVectorQuery, TermQuery};
2173
2174        let tmp_dir = tempfile::tempdir().unwrap();
2175        let dir = MmapDirectory::new(tmp_dir.path());
2176
2177        let dim = 8;
2178        let mut sb = SchemaBuilder::default();
2179        let title = sb.add_text_field("title", true, true);
2180        let body = sb.add_text_field("body", true, true);
2181        let sparse = sb.add_sparse_vector_field("sparse", true, true);
2182        let embedding = sb.add_dense_vector_field_with_config(
2183            "embedding",
2184            true,
2185            true,
2186            DenseVectorConfig {
2187                dim,
2188                index_type: VectorIndexType::Flat,
2189                quantization: crate::dsl::DenseVectorQuantization::F32,
2190                num_clusters: None,
2191                nprobe: 0,
2192                build_threshold: None,
2193                unit_norm: false,
2194            },
2195        );
2196        let schema = sb.build();
2197
2198        let config = IndexConfig::default();
2199        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
2200            .await
2201            .unwrap();
2202
2203        // 80 hay docs with all three modalities
2204        for i in 0..80u32 {
2205            let mut doc = Document::new();
2206            doc.add_text(title, format!("Hay doc {}", i));
2207            doc.add_text(body, "general filler text about nothing special");
2208            doc.add_sparse_vector(sparse, vec![(0, 0.3), (1, 0.2), ((i % 10) + 10, 0.5)]);
2209            let vec: Vec<f32> = (0..dim)
2210                .map(|d| ((i as usize * 3 + d * 7) % 50) as f32 / 100.0)
2211                .collect();
2212            doc.add_dense_vector(embedding, vec);
2213            writer.add_document(doc).unwrap();
2214        }
2215
2216        // Needle doc: unique in ALL three modalities
2217        let mut needle = Document::new();
2218        needle.add_text(title, "The extraordinary rhinoceros");
2219        needle.add_text(
2220            body,
2221            "This document about rhinoceros is the only one with this word",
2222        );
2223        needle.add_sparse_vector(sparse, vec![(9999, 0.99), (9998, 0.88)]);
2224        let needle_vec = vec![0.9; dim];
2225        needle.add_dense_vector(embedding, needle_vec.clone());
2226        writer.add_document(needle).unwrap();
2227
2228        writer.commit().await.unwrap();
2229
2230        let index = Index::open(dir, config).await.unwrap();
2231        assert_eq!(index.num_docs().await.unwrap(), 81);
2232
2233        let reader = index.reader().await.unwrap();
2234        let searcher = reader.searcher().await.unwrap();
2235
2236        // --- Full-text needle ---
2237        let tq = TermQuery::text(body, "rhinoceros");
2238        let results = searcher.search(&tq, 10).await.unwrap();
2239        assert_eq!(
2240            results.len(),
2241            1,
2242            "Full-text: should find exactly the needle"
2243        );
2244        let doc = searcher
2245            .doc(results[0].segment_id, results[0].doc_id)
2246            .await
2247            .unwrap()
2248            .unwrap();
2249        assert!(
2250            doc.get_first(title)
2251                .unwrap()
2252                .as_text()
2253                .unwrap()
2254                .contains("rhinoceros")
2255        );
2256
2257        // --- Sparse vector needle ---
2258        let sq = SparseVectorQuery::new(sparse, vec![(9999, 1.0), (9998, 1.0)]);
2259        let results = searcher.search(&sq, 10).await.unwrap();
2260        assert_eq!(results.len(), 1, "Sparse: should find exactly the needle");
2261        let doc = searcher
2262            .doc(results[0].segment_id, results[0].doc_id)
2263            .await
2264            .unwrap()
2265            .unwrap();
2266        assert!(
2267            doc.get_first(title)
2268                .unwrap()
2269                .as_text()
2270                .unwrap()
2271                .contains("rhinoceros")
2272        );
2273
2274        // --- Dense vector needle ---
2275        let dq = DenseVectorQuery::new(embedding, needle_vec);
2276        let results = searcher.search(&dq, 1).await.unwrap();
2277        assert!(!results.is_empty(), "Dense: should find at least 1 result");
2278        let doc = searcher
2279            .doc(results[0].segment_id, results[0].doc_id)
2280            .await
2281            .unwrap()
2282            .unwrap();
2283        assert_eq!(
2284            doc.get_first(title).unwrap().as_text().unwrap(),
2285            "The extraordinary rhinoceros",
2286            "Dense: top-1 should be the needle"
2287        );
2288
2289        // Verify all three found the same document
2290        let ft_doc_id = {
2291            let tq = TermQuery::text(body, "rhinoceros");
2292            let r = searcher.search(&tq, 1).await.unwrap();
2293            r[0].doc_id
2294        };
2295        let sp_doc_id = {
2296            let sq = SparseVectorQuery::new(sparse, vec![(9999, 1.0)]);
2297            let r = searcher.search(&sq, 1).await.unwrap();
2298            r[0].doc_id
2299        };
2300        let dn_doc_id = {
2301            let dq = DenseVectorQuery::new(embedding, vec![0.9; dim]);
2302            let r = searcher.search(&dq, 1).await.unwrap();
2303            r[0].doc_id
2304        };
2305
2306        assert_eq!(
2307            ft_doc_id, sp_doc_id,
2308            "Full-text and sparse should find same doc"
2309        );
2310        assert_eq!(
2311            sp_doc_id, dn_doc_id,
2312            "Sparse and dense should find same doc"
2313        );
2314    }
2315
2316    /// Stress test: many needles scattered across segments, verify ALL are found.
2317    #[tokio::test]
2318    async fn test_many_needles_all_found() {
2319        let mut sb = SchemaBuilder::default();
2320        let content = sb.add_text_field("content", true, true);
2321        let schema = sb.build();
2322
2323        let dir = RamDirectory::new();
2324        let config = IndexConfig::default();
2325        let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
2326            .await
2327            .unwrap();
2328
2329        let num_needles = 20usize;
2330        let hay_per_batch = 50usize;
2331        let needle_terms: Vec<String> = (0..num_needles)
2332            .map(|i| format!("uniqueneedle{:04}", i))
2333            .collect();
2334
2335        // Interleave needles with hay across commits
2336        for batch in 0..4 {
2337            // Hay
2338            for i in 0..hay_per_batch {
2339                let mut doc = Document::new();
2340                doc.add_text(
2341                    content,
2342                    format!("hay batch {} item {} common filler", batch, i),
2343                );
2344                writer.add_document(doc).unwrap();
2345            }
2346            // 5 needles per batch
2347            for n in 0..5 {
2348                let needle_idx = batch * 5 + n;
2349                let mut doc = Document::new();
2350                doc.add_text(
2351                    content,
2352                    format!("this is {} among many documents", needle_terms[needle_idx]),
2353                );
2354                writer.add_document(doc).unwrap();
2355            }
2356            writer.commit().await.unwrap();
2357        }
2358
2359        let index = Index::open(dir, config).await.unwrap();
2360        let total = index.num_docs().await.unwrap();
2361        assert_eq!(total, (hay_per_batch * 4 + num_needles) as u32);
2362
2363        // Find EVERY needle
2364        for term in &needle_terms {
2365            let results = index.query(term, 10).await.unwrap();
2366            assert_eq!(
2367                results.hits.len(),
2368                1,
2369                "Should find exactly 1 doc for needle '{}'",
2370                term
2371            );
2372        }
2373
2374        // Verify hay term matches all hay docs
2375        let results = index.query("common", 500).await.unwrap();
2376        assert_eq!(
2377            results.hits.len(),
2378            hay_per_batch * 4,
2379            "Common term should match all {} hay docs",
2380            hay_per_batch * 4
2381        );
2382    }
2383
2384    /// Verify that every document's FIELDS (not just existence) survive multiple
2385    /// merge rounds. Uses documents large enough to create multiple store blocks
2386    /// per segment. This catches store merger bugs that silently lose blocks.
2387    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
2388    async fn test_store_fields_survive_multiple_merges() {
2389        use crate::directories::MmapDirectory;
2390
2391        let tmp_dir = tempfile::tempdir().unwrap();
2392        let dir = MmapDirectory::new(tmp_dir.path());
2393
2394        let mut schema_builder = SchemaBuilder::default();
2395        let title = schema_builder.add_text_field("title", true, true);
2396        let body = schema_builder.add_text_field("body", false, true);
2397        let schema = schema_builder.build();
2398
2399        let config = IndexConfig {
2400            max_indexing_memory_bytes: 1024 * 64,
2401            num_indexing_threads: 2,
2402            merge_policy: Box::new(crate::merge::NoMergePolicy),
2403            ..Default::default()
2404        };
2405
2406        let make_doc = |round: usize, idx: usize| -> Document {
2407            let mut doc = Document::new();
2408            doc.add_text(title, format!("doc_r{}_i{} searchterm", round, idx));
2409            let body_text = format!("round={} idx={} {}", round, idx, "abcdefghij ".repeat(90));
2410            doc.add_text(body, body_text);
2411            doc
2412        };
2413
2414        let mut total_docs = 0usize;
2415
2416        // === Round 1: 200 docs across multiple segments ===
2417        {
2418            let mut writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
2419                .await
2420                .unwrap();
2421            for batch in 0..4 {
2422                for i in 0..50 {
2423                    writer.add_document(make_doc(1, batch * 50 + i)).unwrap();
2424                }
2425                writer.commit().await.unwrap();
2426            }
2427            total_docs += 200;
2428        }
2429
2430        // Force merge round 1
2431        {
2432            let mut writer = IndexWriter::open(dir.clone(), config.clone())
2433                .await
2434                .unwrap();
2435            writer.force_merge().await.unwrap();
2436        }
2437
2438        // Verify every doc's fields after merge 1
2439        {
2440            let index = Index::open(dir.clone(), config.clone()).await.unwrap();
2441            assert_eq!(index.num_docs().await.unwrap(), total_docs as u32);
2442
2443            let reader = index.reader().await.unwrap();
2444            let searcher = reader.searcher().await.unwrap();
2445            assert_eq!(
2446                searcher.num_segments(),
2447                1,
2448                "should be 1 segment after merge 1"
2449            );
2450            let seg = &searcher.segment_readers()[0];
2451            let seg_id = seg.meta().id;
2452
2453            assert_eq!(
2454                seg.store().num_docs(),
2455                seg.num_docs(),
2456                "store.num_docs != meta.num_docs after merge 1"
2457            );
2458
2459            for i in 0..total_docs as u32 {
2460                let doc = searcher
2461                    .doc(seg_id, i)
2462                    .await
2463                    .unwrap_or_else(|e| panic!("doc {} error: {}", i, e));
2464                assert!(doc.is_some(), "doc {} missing after merge 1", i);
2465                let doc = doc.unwrap();
2466                let t = doc
2467                    .get_first(title)
2468                    .unwrap_or_else(|| panic!("doc {} missing title", i));
2469                assert!(
2470                    t.as_text().unwrap().contains("searchterm"),
2471                    "doc {} title corrupt after merge 1",
2472                    i
2473                );
2474            }
2475        }
2476
2477        // === Round 2: add 150 more docs, merge again ===
2478        {
2479            let mut writer = IndexWriter::open(dir.clone(), config.clone())
2480                .await
2481                .unwrap();
2482            for batch in 0..3 {
2483                for i in 0..50 {
2484                    writer.add_document(make_doc(2, batch * 50 + i)).unwrap();
2485                }
2486                writer.commit().await.unwrap();
2487            }
2488            total_docs += 150;
2489            writer.force_merge().await.unwrap();
2490        }
2491
2492        // Verify every doc's fields after merge 2
2493        {
2494            let index = Index::open(dir.clone(), config.clone()).await.unwrap();
2495            assert_eq!(index.num_docs().await.unwrap(), total_docs as u32);
2496
2497            let reader = index.reader().await.unwrap();
2498            let searcher = reader.searcher().await.unwrap();
2499            assert_eq!(
2500                searcher.num_segments(),
2501                1,
2502                "should be 1 segment after merge 2"
2503            );
2504            let seg = &searcher.segment_readers()[0];
2505            let seg_id = seg.meta().id;
2506
2507            assert_eq!(
2508                seg.store().num_docs(),
2509                seg.num_docs(),
2510                "store.num_docs != meta.num_docs after merge 2"
2511            );
2512
2513            for i in 0..total_docs as u32 {
2514                let doc = searcher
2515                    .doc(seg_id, i)
2516                    .await
2517                    .unwrap_or_else(|e| panic!("doc {} error: {}", i, e));
2518                assert!(doc.is_some(), "doc {} missing after merge 2", i);
2519                let doc = doc.unwrap();
2520                let t = doc
2521                    .get_first(title)
2522                    .unwrap_or_else(|| panic!("doc {} missing title", i));
2523                assert!(
2524                    t.as_text().unwrap().contains("searchterm"),
2525                    "doc {} title corrupt after merge 2",
2526                    i
2527                );
2528            }
2529        }
2530
2531        // === Round 3: add 100 more, merge a third time ===
2532        {
2533            let mut writer = IndexWriter::open(dir.clone(), config.clone())
2534                .await
2535                .unwrap();
2536            for batch in 0..2 {
2537                for i in 0..50 {
2538                    writer.add_document(make_doc(3, batch * 50 + i)).unwrap();
2539                }
2540                writer.commit().await.unwrap();
2541            }
2542            total_docs += 100;
2543            writer.force_merge().await.unwrap();
2544        }
2545
2546        // Verify every doc's fields after merge 3
2547        {
2548            let index = Index::open(dir.clone(), config.clone()).await.unwrap();
2549            assert_eq!(index.num_docs().await.unwrap(), total_docs as u32);
2550
2551            let reader = index.reader().await.unwrap();
2552            let searcher = reader.searcher().await.unwrap();
2553            assert_eq!(
2554                searcher.num_segments(),
2555                1,
2556                "should be 1 segment after merge 3"
2557            );
2558            let seg = &searcher.segment_readers()[0];
2559            let seg_id = seg.meta().id;
2560
2561            assert_eq!(
2562                seg.store().num_docs(),
2563                seg.num_docs(),
2564                "store.num_docs != meta.num_docs after merge 3"
2565            );
2566
2567            let mut missing = 0;
2568            let mut corrupt = 0;
2569            for i in 0..total_docs as u32 {
2570                match searcher.doc(seg_id, i).await {
2571                    Ok(Some(doc)) => {
2572                        if let Some(t) = doc.get_first(title) {
2573                            if !t.as_text().unwrap_or("").contains("searchterm") {
2574                                corrupt += 1;
2575                            }
2576                        } else {
2577                            corrupt += 1;
2578                        }
2579                    }
2580                    Ok(None) => missing += 1,
2581                    Err(e) => panic!("doc {} error after merge 3: {}", i, e),
2582                }
2583            }
2584            assert_eq!(
2585                missing, 0,
2586                "merge 3: {} of {} docs missing from store",
2587                missing, total_docs
2588            );
2589            assert_eq!(
2590                corrupt, 0,
2591                "merge 3: {} of {} docs have corrupt fields",
2592                corrupt, total_docs
2593            );
2594        }
2595
2596        eprintln!("All {} docs verified across 3 merge rounds", total_docs);
2597    }
2598
2599    /// Large-scale store test: ~3000 docs with ~1KB each → many store blocks per segment.
2600    /// Verifies every doc's fields survive 4 merge rounds with MmapDirectory.
2601    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
2602    async fn test_store_large_scale_multi_merge() {
2603        use crate::directories::MmapDirectory;
2604
2605        let tmp_dir = tempfile::tempdir().unwrap();
2606        let dir = MmapDirectory::new(tmp_dir.path());
2607
2608        let mut schema_builder = SchemaBuilder::default();
2609        let title = schema_builder.add_text_field("title", true, true);
2610        let body = schema_builder.add_text_field("body", false, true);
2611        let schema = schema_builder.build();
2612
2613        let config = IndexConfig {
2614            max_indexing_memory_bytes: 1024 * 256,
2615            num_indexing_threads: 2,
2616            merge_policy: Box::new(crate::merge::NoMergePolicy),
2617            ..Default::default()
2618        };
2619
2620        // ~1KB per doc → ~256 docs per 256KB store block → 800 docs ≈ 3 blocks
2621        let make_doc = |round: usize, idx: usize| -> Document {
2622            let mut doc = Document::new();
2623            doc.add_text(title, format!("r{}_i{}_needle", round, idx));
2624            doc.add_text(body, format!("r{}i{} {}", round, idx, "x".repeat(950)));
2625            doc
2626        };
2627
2628        let mut total_docs = 0u32;
2629
2630        for round in 0..4 {
2631            let docs_this_round = 800u32;
2632
2633            // Add docs across multiple segments
2634            {
2635                let mut writer = if round == 0 {
2636                    IndexWriter::create(dir.clone(), schema.clone(), config.clone())
2637                        .await
2638                        .unwrap()
2639                } else {
2640                    IndexWriter::open(dir.clone(), config.clone())
2641                        .await
2642                        .unwrap()
2643                };
2644                for batch in 0..4 {
2645                    for i in 0..docs_this_round / 4 {
2646                        writer
2647                            .add_document(make_doc(
2648                                round,
2649                                (batch * (docs_this_round / 4) + i) as usize,
2650                            ))
2651                            .unwrap();
2652                    }
2653                    writer.commit().await.unwrap();
2654                }
2655                total_docs += docs_this_round;
2656                writer.force_merge().await.unwrap();
2657            }
2658
2659            // Verify every doc's fields in the merged segment
2660            {
2661                let index = Index::open(dir.clone(), config.clone()).await.unwrap();
2662                assert_eq!(index.num_docs().await.unwrap(), total_docs);
2663                let reader = index.reader().await.unwrap();
2664                let searcher = reader.searcher().await.unwrap();
2665                assert_eq!(
2666                    searcher.num_segments(),
2667                    1,
2668                    "round {}: expected 1 segment",
2669                    round
2670                );
2671                let seg = &searcher.segment_readers()[0];
2672                let seg_id = seg.meta().id;
2673                assert_eq!(
2674                    seg.store().num_docs(),
2675                    seg.num_docs(),
2676                    "round {}: store/meta mismatch",
2677                    round
2678                );
2679                let mut missing = 0u32;
2680                for i in 0..total_docs {
2681                    match searcher.doc(seg_id, i).await {
2682                        Ok(Some(doc)) => {
2683                            let t = doc.get_first(title);
2684                            assert!(
2685                                t.is_some() && t.unwrap().as_text().unwrap().contains("needle"),
2686                                "round {}: doc {} corrupt",
2687                                round,
2688                                i
2689                            );
2690                        }
2691                        Ok(None) => missing += 1,
2692                        Err(e) => panic!("round {}: doc {} error: {}", round, i, e),
2693                    }
2694                }
2695                assert_eq!(
2696                    missing, 0,
2697                    "round {}: {} of {} docs missing",
2698                    round, missing, total_docs
2699                );
2700            }
2701
2702            eprintln!(
2703                "Round {}: {} docs verified ({} total)",
2704                round, docs_this_round, total_docs
2705            );
2706        }
2707        eprintln!(
2708            "All {} docs verified across 4 large-scale merge rounds",
2709            total_docs
2710        );
2711    }
2712}