Skip to main content

hermes_core/index/
mod.rs

1//! Index - multi-segment async search index
2//!
3//! The `Index` is the central concept that provides:
4//! - `Index::create()` / `Index::open()` - create or open an index
5//! - `index.writer()` - get an IndexWriter for adding documents
6//! - `index.reader()` - get an IndexReader for searching (with reload policy)
7//!
8//! The Index owns the SegmentManager which handles segment lifecycle and tracking.
9
10#[cfg(feature = "native")]
11use crate::dsl::Schema;
12#[cfg(feature = "native")]
13use crate::error::Result;
14#[cfg(feature = "native")]
15use crate::structures::{CoarseCentroids, PQCodebook};
16#[cfg(feature = "native")]
17use rustc_hash::FxHashMap;
18#[cfg(feature = "native")]
19use std::sync::Arc;
20
21mod searcher;
22pub use searcher::Searcher;
23
24#[cfg(feature = "native")]
25mod reader;
26#[cfg(feature = "native")]
27mod vector_builder;
28#[cfg(feature = "native")]
29mod writer;
30#[cfg(feature = "native")]
31pub use reader::IndexReader;
32#[cfg(feature = "native")]
33pub use writer::IndexWriter;
34
35mod metadata;
36pub use metadata::{FieldVectorMeta, INDEX_META_FILENAME, IndexMetadata, VectorIndexState};
37
38#[cfg(feature = "native")]
39mod helpers;
40#[cfg(feature = "native")]
41pub use helpers::{
42    IndexingStats, SchemaConfig, SchemaFieldConfig, create_index_at_path, create_index_from_sdl,
43    index_documents_from_reader, index_json_document, parse_schema,
44};
45
46/// Default file name for the slice cache
47pub const SLICE_CACHE_FILENAME: &str = "index.slicecache";
48
49/// Index configuration
50#[derive(Debug, Clone)]
51pub struct IndexConfig {
52    /// Number of threads for CPU-intensive tasks (search parallelism)
53    pub num_threads: usize,
54    /// Number of parallel segment builders (documents distributed round-robin)
55    pub num_indexing_threads: usize,
56    /// Number of threads for parallel block compression within each segment
57    pub num_compression_threads: usize,
58    /// Block cache size for term dictionary per segment
59    pub term_cache_blocks: usize,
60    /// Block cache size for document store per segment
61    pub store_cache_blocks: usize,
62    /// Max memory (bytes) across all builders before auto-commit (global limit)
63    pub max_indexing_memory_bytes: usize,
64    /// Merge policy for background segment merging
65    pub merge_policy: Box<dyn crate::merge::MergePolicy>,
66    /// Index optimization mode (adaptive, size-optimized, performance-optimized)
67    pub optimization: crate::structures::IndexOptimization,
68    /// Reload interval in milliseconds for IndexReader (how often to check for new segments)
69    pub reload_interval_ms: u64,
70}
71
72impl Default for IndexConfig {
73    fn default() -> Self {
74        #[cfg(feature = "native")]
75        let cpus = num_cpus::get().max(1);
76        #[cfg(not(feature = "native"))]
77        let cpus = 1;
78
79        Self {
80            num_threads: cpus,
81            num_indexing_threads: 1,
82            num_compression_threads: cpus,
83            term_cache_blocks: 256,
84            store_cache_blocks: 32,
85            max_indexing_memory_bytes: 256 * 1024 * 1024, // 256 MB default
86            merge_policy: Box::new(crate::merge::TieredMergePolicy::default()),
87            optimization: crate::structures::IndexOptimization::default(),
88            reload_interval_ms: 1000, // 1 second default
89        }
90    }
91}
92
93/// Multi-segment async Index
94///
95/// The central concept for search. Owns segment lifecycle and provides:
96/// - `Index::create()` / `Index::open()` - create or open an index
97/// - `index.writer()` - get an IndexWriter for adding documents
98/// - `index.reader()` - get an IndexReader for searching with reload policy
99///
100/// All segment management is delegated to SegmentManager.
101#[cfg(feature = "native")]
102pub struct Index<D: crate::directories::DirectoryWriter + 'static> {
103    directory: Arc<D>,
104    schema: Arc<Schema>,
105    config: IndexConfig,
106    /// Segment manager - owns segments, tracker, and metadata
107    segment_manager: Arc<crate::merge::SegmentManager<D>>,
108    /// Trained centroids for vector search
109    trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
110    /// Trained codebooks for vector search
111    trained_codebooks: FxHashMap<u32, Arc<PQCodebook>>,
112    /// Cached reader (created lazily, reused across calls)
113    cached_reader: tokio::sync::OnceCell<IndexReader<D>>,
114}
115
116#[cfg(feature = "native")]
117impl<D: crate::directories::DirectoryWriter + 'static> Index<D> {
118    /// Create a new index in the directory
119    pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
120        let directory = Arc::new(directory);
121        let schema = Arc::new(schema);
122        let metadata = IndexMetadata::new((*schema).clone());
123
124        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
125            Arc::clone(&directory),
126            Arc::clone(&schema),
127            metadata,
128            config.merge_policy.clone_box(),
129            config.term_cache_blocks,
130        ));
131
132        // Save initial metadata
133        segment_manager.update_metadata(|_| {}).await?;
134
135        Ok(Self {
136            directory,
137            schema,
138            config,
139            segment_manager,
140            trained_centroids: FxHashMap::default(),
141            trained_codebooks: FxHashMap::default(),
142            cached_reader: tokio::sync::OnceCell::new(),
143        })
144    }
145
146    /// Open an existing index from a directory
147    pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
148        let directory = Arc::new(directory);
149
150        // Load metadata (includes schema)
151        let metadata = IndexMetadata::load(directory.as_ref()).await?;
152        let schema = Arc::new(metadata.schema.clone());
153
154        // Load trained structures
155        let trained = metadata.load_trained_structures(directory.as_ref()).await;
156        let trained_centroids = trained
157            .as_ref()
158            .map(|t| t.centroids.clone())
159            .unwrap_or_default();
160        let trained_codebooks = trained
161            .as_ref()
162            .map(|t| t.codebooks.clone())
163            .unwrap_or_default();
164
165        log::debug!(
166            "[Index::open] trained_centroids fields={:?}, trained_codebooks fields={:?}",
167            trained_centroids.keys().collect::<Vec<_>>(),
168            trained_codebooks.keys().collect::<Vec<_>>(),
169        );
170
171        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
172            Arc::clone(&directory),
173            Arc::clone(&schema),
174            metadata,
175            config.merge_policy.clone_box(),
176            config.term_cache_blocks,
177        ));
178
179        Ok(Self {
180            directory,
181            schema,
182            config,
183            segment_manager,
184            trained_centroids,
185            trained_codebooks,
186            cached_reader: tokio::sync::OnceCell::new(),
187        })
188    }
189
190    /// Get the schema
191    pub fn schema(&self) -> &Schema {
192        &self.schema
193    }
194
195    /// Get a reference to the underlying directory
196    pub fn directory(&self) -> &D {
197        &self.directory
198    }
199
200    /// Get the segment manager
201    pub fn segment_manager(&self) -> &Arc<crate::merge::SegmentManager<D>> {
202        &self.segment_manager
203    }
204
205    /// Get an IndexReader for searching (with reload policy)
206    ///
207    /// The reader is cached and reused across calls. The reader's internal
208    /// searcher will reload segments based on its reload interval (configurable via IndexConfig).
209    pub async fn reader(&self) -> Result<&IndexReader<D>> {
210        self.cached_reader
211            .get_or_try_init(|| async {
212                IndexReader::from_segment_manager(
213                    Arc::clone(&self.schema),
214                    Arc::clone(&self.segment_manager),
215                    self.config.term_cache_blocks,
216                    self.config.reload_interval_ms,
217                )
218                .await
219            })
220            .await
221    }
222
223    /// Get the config
224    pub fn config(&self) -> &IndexConfig {
225        &self.config
226    }
227
228    /// Get trained centroids
229    pub fn trained_centroids(&self) -> &FxHashMap<u32, Arc<CoarseCentroids>> {
230        &self.trained_centroids
231    }
232
233    /// Get trained codebooks
234    pub fn trained_codebooks(&self) -> &FxHashMap<u32, Arc<PQCodebook>> {
235        &self.trained_codebooks
236    }
237
238    /// Get segment readers for query execution (convenience method)
239    pub async fn segment_readers(&self) -> Result<Vec<Arc<crate::segment::SegmentReader>>> {
240        let reader = self.reader().await?;
241        let searcher = reader.searcher().await?;
242        Ok(searcher.segment_readers().to_vec())
243    }
244
245    /// Total number of documents across all segments
246    pub async fn num_docs(&self) -> Result<u32> {
247        let reader = self.reader().await?;
248        let searcher = reader.searcher().await?;
249        Ok(searcher.num_docs())
250    }
251
252    /// Get default fields for search
253    pub fn default_fields(&self) -> Vec<crate::Field> {
254        if !self.schema.default_fields().is_empty() {
255            self.schema.default_fields().to_vec()
256        } else {
257            self.schema
258                .fields()
259                .filter(|(_, entry)| {
260                    entry.indexed && entry.field_type == crate::dsl::FieldType::Text
261                })
262                .map(|(field, _)| field)
263                .collect()
264        }
265    }
266
267    /// Get tokenizer registry
268    pub fn tokenizers(&self) -> Arc<crate::tokenizer::TokenizerRegistry> {
269        Arc::new(crate::tokenizer::TokenizerRegistry::default())
270    }
271
272    /// Create a query parser for this index
273    pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
274        let default_fields = self.default_fields();
275        let tokenizers = self.tokenizers();
276
277        let query_routers = self.schema.query_routers();
278        if !query_routers.is_empty()
279            && let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers)
280        {
281            return crate::dsl::QueryLanguageParser::with_router(
282                Arc::clone(&self.schema),
283                default_fields,
284                tokenizers,
285                router,
286            );
287        }
288
289        crate::dsl::QueryLanguageParser::new(Arc::clone(&self.schema), default_fields, tokenizers)
290    }
291
292    /// Parse and search using a query string
293    pub async fn query(
294        &self,
295        query_str: &str,
296        limit: usize,
297    ) -> Result<crate::query::SearchResponse> {
298        self.query_offset(query_str, limit, 0).await
299    }
300
301    /// Query with offset for pagination
302    pub async fn query_offset(
303        &self,
304        query_str: &str,
305        limit: usize,
306        offset: usize,
307    ) -> Result<crate::query::SearchResponse> {
308        let parser = self.query_parser();
309        let query = parser
310            .parse(query_str)
311            .map_err(crate::error::Error::Query)?;
312        self.search_offset(query.as_ref(), limit, offset).await
313    }
314
315    /// Search and return results
316    pub async fn search(
317        &self,
318        query: &dyn crate::query::Query,
319        limit: usize,
320    ) -> Result<crate::query::SearchResponse> {
321        self.search_offset(query, limit, 0).await
322    }
323
324    /// Search with offset for pagination
325    pub async fn search_offset(
326        &self,
327        query: &dyn crate::query::Query,
328        limit: usize,
329        offset: usize,
330    ) -> Result<crate::query::SearchResponse> {
331        let reader = self.reader().await?;
332        let searcher = reader.searcher().await?;
333        let segments = searcher.segment_readers();
334
335        let fetch_limit = offset + limit;
336
337        let futures: Vec<_> = segments
338            .iter()
339            .map(|segment| {
340                let sid = segment.meta().id;
341                async move {
342                    let results =
343                        crate::query::search_segment(segment.as_ref(), query, fetch_limit).await?;
344                    Ok::<_, crate::error::Error>(
345                        results
346                            .into_iter()
347                            .map(move |r| (sid, r))
348                            .collect::<Vec<_>>(),
349                    )
350                }
351            })
352            .collect();
353
354        let batches = futures::future::try_join_all(futures).await?;
355        let mut all_results: Vec<(u128, crate::query::SearchResult)> =
356            Vec::with_capacity(batches.iter().map(|b| b.len()).sum());
357        for batch in batches {
358            all_results.extend(batch);
359        }
360
361        all_results.sort_by(|a, b| {
362            b.1.score
363                .partial_cmp(&a.1.score)
364                .unwrap_or(std::cmp::Ordering::Equal)
365        });
366
367        let total_hits = all_results.len() as u32;
368
369        let hits: Vec<crate::query::SearchHit> = all_results
370            .into_iter()
371            .skip(offset)
372            .take(limit)
373            .map(|(segment_id, result)| crate::query::SearchHit {
374                address: crate::query::DocAddress::new(segment_id, result.doc_id),
375                score: result.score,
376                matched_fields: result.extract_ordinals(),
377            })
378            .collect();
379
380        Ok(crate::query::SearchResponse { hits, total_hits })
381    }
382
383    /// Get a document by its unique address
384    pub async fn get_document(
385        &self,
386        address: &crate::query::DocAddress,
387    ) -> Result<Option<crate::dsl::Document>> {
388        let reader = self.reader().await?;
389        let searcher = reader.searcher().await?;
390        searcher.get_document(address).await
391    }
392
393    /// Reload is no longer needed - reader handles this automatically
394    pub async fn reload(&self) -> Result<()> {
395        // No-op - reader reloads automatically based on policy
396        Ok(())
397    }
398
399    /// Get posting lists for a term across all segments
400    pub async fn get_postings(
401        &self,
402        field: crate::Field,
403        term: &[u8],
404    ) -> Result<
405        Vec<(
406            Arc<crate::segment::SegmentReader>,
407            crate::structures::BlockPostingList,
408        )>,
409    > {
410        let segments = self.segment_readers().await?;
411        let mut results = Vec::new();
412
413        for segment in segments {
414            if let Some(postings) = segment.get_postings(field, term).await? {
415                results.push((segment, postings));
416            }
417        }
418
419        Ok(results)
420    }
421}
422
423/// Native-only methods for Index
424#[cfg(feature = "native")]
425impl<D: crate::directories::DirectoryWriter + 'static> Index<D> {
426    /// Get an IndexWriter for adding documents
427    pub fn writer(&self) -> writer::IndexWriter<D> {
428        writer::IndexWriter::from_index(self)
429    }
430}
431
432// TODO: Add back warmup_and_save_slice_cache when slice caching is re-integrated
433
434#[cfg(test)]
435mod tests {
436    use super::*;
437    use crate::directories::RamDirectory;
438    use crate::dsl::{Document, SchemaBuilder};
439
440    #[tokio::test]
441    async fn test_index_create_and_search() {
442        let mut schema_builder = SchemaBuilder::default();
443        let title = schema_builder.add_text_field("title", true, true);
444        let body = schema_builder.add_text_field("body", true, true);
445        let schema = schema_builder.build();
446
447        let dir = RamDirectory::new();
448        let config = IndexConfig::default();
449
450        // Create index and add documents
451        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
452            .await
453            .unwrap();
454
455        let mut doc1 = Document::new();
456        doc1.add_text(title, "Hello World");
457        doc1.add_text(body, "This is the first document");
458        writer.add_document(doc1).unwrap();
459
460        let mut doc2 = Document::new();
461        doc2.add_text(title, "Goodbye World");
462        doc2.add_text(body, "This is the second document");
463        writer.add_document(doc2).unwrap();
464
465        writer.commit().await.unwrap();
466
467        // Open for reading
468        let index = Index::open(dir, config).await.unwrap();
469        assert_eq!(index.num_docs().await.unwrap(), 2);
470
471        // Check postings
472        let postings = index.get_postings(title, b"world").await.unwrap();
473        assert_eq!(postings.len(), 1); // One segment
474        assert_eq!(postings[0].1.doc_count(), 2); // Two docs with "world"
475
476        // Retrieve document via searcher snapshot
477        let reader = index.reader().await.unwrap();
478        let searcher = reader.searcher().await.unwrap();
479        let doc = searcher.doc(0).await.unwrap().unwrap();
480        assert_eq!(doc.get_first(title).unwrap().as_text(), Some("Hello World"));
481    }
482
483    #[tokio::test]
484    async fn test_multiple_segments() {
485        let mut schema_builder = SchemaBuilder::default();
486        let title = schema_builder.add_text_field("title", true, true);
487        let schema = schema_builder.build();
488
489        let dir = RamDirectory::new();
490        let config = IndexConfig {
491            max_indexing_memory_bytes: 1024, // Very small to trigger frequent flushes
492            ..Default::default()
493        };
494
495        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
496            .await
497            .unwrap();
498
499        // Add documents in batches to create multiple segments
500        for batch in 0..3 {
501            for i in 0..5 {
502                let mut doc = Document::new();
503                doc.add_text(title, format!("Document {} batch {}", i, batch));
504                writer.add_document(doc).unwrap();
505            }
506            writer.commit().await.unwrap();
507        }
508
509        // Open and check
510        let index = Index::open(dir, config).await.unwrap();
511        assert_eq!(index.num_docs().await.unwrap(), 15);
512        // With queue-based indexing, exact segment count varies
513        assert!(
514            index.segment_readers().await.unwrap().len() >= 2,
515            "Expected multiple segments"
516        );
517    }
518
519    #[tokio::test]
520    async fn test_segment_merge() {
521        let mut schema_builder = SchemaBuilder::default();
522        let title = schema_builder.add_text_field("title", true, true);
523        let schema = schema_builder.build();
524
525        let dir = RamDirectory::new();
526        let config = IndexConfig {
527            max_indexing_memory_bytes: 512, // Very small to trigger frequent flushes
528            ..Default::default()
529        };
530
531        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
532            .await
533            .unwrap();
534
535        // Create multiple segments by flushing between batches
536        for batch in 0..3 {
537            for i in 0..3 {
538                let mut doc = Document::new();
539                doc.add_text(title, format!("Document {} batch {}", i, batch));
540                writer.add_document(doc).unwrap();
541            }
542            writer.flush().await.unwrap();
543        }
544        writer.commit().await.unwrap();
545
546        // Should have multiple segments (at least 2, one per flush with docs)
547        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
548        assert!(
549            index.segment_readers().await.unwrap().len() >= 2,
550            "Expected multiple segments"
551        );
552
553        // Force merge
554        let writer = IndexWriter::open(dir.clone(), config.clone())
555            .await
556            .unwrap();
557        writer.force_merge().await.unwrap();
558
559        // Should have 1 segment now
560        let index = Index::open(dir, config).await.unwrap();
561        assert_eq!(index.segment_readers().await.unwrap().len(), 1);
562        assert_eq!(index.num_docs().await.unwrap(), 9);
563
564        // Verify all documents accessible (order may vary with queue-based indexing)
565        let reader = index.reader().await.unwrap();
566        let searcher = reader.searcher().await.unwrap();
567        let mut found_docs = 0;
568        for i in 0..9 {
569            if searcher.doc(i).await.unwrap().is_some() {
570                found_docs += 1;
571            }
572        }
573        assert_eq!(found_docs, 9);
574    }
575
576    #[tokio::test]
577    async fn test_match_query() {
578        let mut schema_builder = SchemaBuilder::default();
579        let title = schema_builder.add_text_field("title", true, true);
580        let body = schema_builder.add_text_field("body", true, true);
581        let schema = schema_builder.build();
582
583        let dir = RamDirectory::new();
584        let config = IndexConfig::default();
585
586        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
587            .await
588            .unwrap();
589
590        let mut doc1 = Document::new();
591        doc1.add_text(title, "rust programming");
592        doc1.add_text(body, "Learn rust language");
593        writer.add_document(doc1).unwrap();
594
595        let mut doc2 = Document::new();
596        doc2.add_text(title, "python programming");
597        doc2.add_text(body, "Learn python language");
598        writer.add_document(doc2).unwrap();
599
600        writer.commit().await.unwrap();
601
602        let index = Index::open(dir, config).await.unwrap();
603
604        // Test match query with multiple default fields
605        let results = index.query("rust", 10).await.unwrap();
606        assert_eq!(results.hits.len(), 1);
607
608        // Test match query with multiple tokens
609        let results = index.query("rust programming", 10).await.unwrap();
610        assert!(!results.hits.is_empty());
611
612        // Verify hit has address (segment_id + doc_id)
613        let hit = &results.hits[0];
614        assert!(!hit.address.segment_id.is_empty(), "Should have segment_id");
615
616        // Verify document retrieval by address
617        let doc = index.get_document(&hit.address).await.unwrap().unwrap();
618        assert!(
619            !doc.field_values().is_empty(),
620            "Doc should have field values"
621        );
622
623        // Also verify doc retrieval via searcher snapshot
624        let reader = index.reader().await.unwrap();
625        let searcher = reader.searcher().await.unwrap();
626        let doc = searcher.doc(0).await.unwrap().unwrap();
627        assert!(
628            !doc.field_values().is_empty(),
629            "Doc should have field values"
630        );
631    }
632
633    #[tokio::test]
634    async fn test_slice_cache_warmup_and_load() {
635        use crate::directories::SliceCachingDirectory;
636
637        let mut schema_builder = SchemaBuilder::default();
638        let title = schema_builder.add_text_field("title", true, true);
639        let body = schema_builder.add_text_field("body", true, true);
640        let schema = schema_builder.build();
641
642        let dir = RamDirectory::new();
643        let config = IndexConfig::default();
644
645        // Create index with some documents
646        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
647            .await
648            .unwrap();
649
650        for i in 0..10 {
651            let mut doc = Document::new();
652            doc.add_text(title, format!("Document {} about rust", i));
653            doc.add_text(body, format!("This is body text number {}", i));
654            writer.add_document(doc).unwrap();
655        }
656        writer.commit().await.unwrap();
657
658        // Open with slice caching and perform some operations to warm up cache
659        let caching_dir = SliceCachingDirectory::new(dir.clone(), 1024 * 1024);
660        let index = Index::open(caching_dir, config.clone()).await.unwrap();
661
662        // Perform a search to warm up the cache
663        let results = index.query("rust", 10).await.unwrap();
664        assert!(!results.hits.is_empty());
665
666        // Check cache stats - should have cached some data
667        let stats = index.directory.stats();
668        assert!(stats.total_bytes > 0, "Cache should have data after search");
669    }
670
671    #[tokio::test]
672    async fn test_multivalue_field_indexing_and_search() {
673        let mut schema_builder = SchemaBuilder::default();
674        let uris = schema_builder.add_text_field("uris", true, true);
675        let title = schema_builder.add_text_field("title", true, true);
676        let schema = schema_builder.build();
677
678        let dir = RamDirectory::new();
679        let config = IndexConfig::default();
680
681        // Create index and add document with multi-value field
682        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
683            .await
684            .unwrap();
685
686        let mut doc = Document::new();
687        doc.add_text(uris, "one");
688        doc.add_text(uris, "two");
689        doc.add_text(title, "Test Document");
690        writer.add_document(doc).unwrap();
691
692        // Add another document with different uris
693        let mut doc2 = Document::new();
694        doc2.add_text(uris, "three");
695        doc2.add_text(title, "Another Document");
696        writer.add_document(doc2).unwrap();
697
698        writer.commit().await.unwrap();
699
700        // Open for reading
701        let index = Index::open(dir, config).await.unwrap();
702        assert_eq!(index.num_docs().await.unwrap(), 2);
703
704        // Verify document retrieval preserves all values
705        let reader = index.reader().await.unwrap();
706        let searcher = reader.searcher().await.unwrap();
707        let doc = searcher.doc(0).await.unwrap().unwrap();
708        let all_uris: Vec<_> = doc.get_all(uris).collect();
709        assert_eq!(all_uris.len(), 2, "Should have 2 uris values");
710        assert_eq!(all_uris[0].as_text(), Some("one"));
711        assert_eq!(all_uris[1].as_text(), Some("two"));
712
713        // Verify to_json returns array for multi-value field
714        let json = doc.to_json(index.schema());
715        let uris_json = json.get("uris").unwrap();
716        assert!(uris_json.is_array(), "Multi-value field should be an array");
717        let uris_arr = uris_json.as_array().unwrap();
718        assert_eq!(uris_arr.len(), 2);
719        assert_eq!(uris_arr[0].as_str(), Some("one"));
720        assert_eq!(uris_arr[1].as_str(), Some("two"));
721
722        // Verify both values are searchable
723        let results = index.query("uris:one", 10).await.unwrap();
724        assert_eq!(results.hits.len(), 1, "Should find doc with 'one'");
725        assert_eq!(results.hits[0].address.doc_id, 0);
726
727        let results = index.query("uris:two", 10).await.unwrap();
728        assert_eq!(results.hits.len(), 1, "Should find doc with 'two'");
729        assert_eq!(results.hits[0].address.doc_id, 0);
730
731        let results = index.query("uris:three", 10).await.unwrap();
732        assert_eq!(results.hits.len(), 1, "Should find doc with 'three'");
733        assert_eq!(results.hits[0].address.doc_id, 1);
734
735        // Verify searching for non-existent value returns no results
736        let results = index.query("uris:nonexistent", 10).await.unwrap();
737        assert_eq!(results.hits.len(), 0, "Should not find non-existent value");
738    }
739
740    /// Comprehensive test for WAND optimization in BooleanQuery OR queries
741    ///
742    /// This test verifies that:
743    /// 1. BooleanQuery with multiple SHOULD term queries uses WAND automatically
744    /// 2. Search results are correct regardless of WAND optimization
745    /// 3. Scores are reasonable for matching documents
746    #[tokio::test]
747    async fn test_wand_optimization_for_or_queries() {
748        use crate::query::{BooleanQuery, TermQuery};
749
750        let mut schema_builder = SchemaBuilder::default();
751        let content = schema_builder.add_text_field("content", true, true);
752        let schema = schema_builder.build();
753
754        let dir = RamDirectory::new();
755        let config = IndexConfig::default();
756
757        // Create index with documents containing various terms
758        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
759            .await
760            .unwrap();
761
762        // Doc 0: contains "rust" and "programming"
763        let mut doc = Document::new();
764        doc.add_text(content, "rust programming language is fast");
765        writer.add_document(doc).unwrap();
766
767        // Doc 1: contains "rust" only
768        let mut doc = Document::new();
769        doc.add_text(content, "rust is a systems language");
770        writer.add_document(doc).unwrap();
771
772        // Doc 2: contains "programming" only
773        let mut doc = Document::new();
774        doc.add_text(content, "programming is fun");
775        writer.add_document(doc).unwrap();
776
777        // Doc 3: contains "python" (neither rust nor programming)
778        let mut doc = Document::new();
779        doc.add_text(content, "python is easy to learn");
780        writer.add_document(doc).unwrap();
781
782        // Doc 4: contains both "rust" and "programming" multiple times
783        let mut doc = Document::new();
784        doc.add_text(content, "rust rust programming programming systems");
785        writer.add_document(doc).unwrap();
786
787        writer.commit().await.unwrap();
788
789        // Open for reading
790        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
791
792        // Test 1: Pure OR query with multiple terms (should use WAND automatically)
793        let or_query = BooleanQuery::new()
794            .should(TermQuery::text(content, "rust"))
795            .should(TermQuery::text(content, "programming"));
796
797        let results = index.search(&or_query, 10).await.unwrap();
798
799        // Should find docs 0, 1, 2, 4 (all that contain "rust" OR "programming")
800        assert_eq!(results.hits.len(), 4, "Should find exactly 4 documents");
801
802        let doc_ids: Vec<u32> = results.hits.iter().map(|h| h.address.doc_id).collect();
803        assert!(doc_ids.contains(&0), "Should find doc 0");
804        assert!(doc_ids.contains(&1), "Should find doc 1");
805        assert!(doc_ids.contains(&2), "Should find doc 2");
806        assert!(doc_ids.contains(&4), "Should find doc 4");
807        assert!(
808            !doc_ids.contains(&3),
809            "Should NOT find doc 3 (only has 'python')"
810        );
811
812        // Test 2: Single term query (should NOT use WAND, but still work)
813        let single_query = BooleanQuery::new().should(TermQuery::text(content, "rust"));
814
815        let results = index.search(&single_query, 10).await.unwrap();
816        assert_eq!(results.hits.len(), 3, "Should find 3 documents with 'rust'");
817
818        // Test 3: Query with MUST (should NOT use WAND)
819        let must_query = BooleanQuery::new()
820            .must(TermQuery::text(content, "rust"))
821            .should(TermQuery::text(content, "programming"));
822
823        let results = index.search(&must_query, 10).await.unwrap();
824        // Must have "rust", optionally "programming"
825        assert_eq!(results.hits.len(), 3, "Should find 3 documents with 'rust'");
826
827        // Test 4: Query with MUST_NOT (should NOT use WAND)
828        let must_not_query = BooleanQuery::new()
829            .should(TermQuery::text(content, "rust"))
830            .should(TermQuery::text(content, "programming"))
831            .must_not(TermQuery::text(content, "systems"));
832
833        let results = index.search(&must_not_query, 10).await.unwrap();
834        // Should exclude docs with "systems" (doc 1 and 4)
835        let doc_ids: Vec<u32> = results.hits.iter().map(|h| h.address.doc_id).collect();
836        assert!(
837            !doc_ids.contains(&1),
838            "Should NOT find doc 1 (has 'systems')"
839        );
840        assert!(
841            !doc_ids.contains(&4),
842            "Should NOT find doc 4 (has 'systems')"
843        );
844
845        // Test 5: Verify top-k limit works correctly with WAND
846        let or_query = BooleanQuery::new()
847            .should(TermQuery::text(content, "rust"))
848            .should(TermQuery::text(content, "programming"));
849
850        let results = index.search(&or_query, 2).await.unwrap();
851        assert_eq!(results.hits.len(), 2, "Should return only top 2 results");
852
853        // Top results should be docs that match both terms (higher scores)
854        // Doc 0 and 4 contain both "rust" and "programming"
855    }
856
857    /// Test that WAND optimization produces same results as non-WAND for correctness
858    #[tokio::test]
859    async fn test_wand_results_match_standard_boolean() {
860        use crate::query::{BooleanQuery, TermQuery, WandOrQuery};
861
862        let mut schema_builder = SchemaBuilder::default();
863        let content = schema_builder.add_text_field("content", true, true);
864        let schema = schema_builder.build();
865
866        let dir = RamDirectory::new();
867        let config = IndexConfig::default();
868
869        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
870            .await
871            .unwrap();
872
873        // Add several documents
874        for i in 0..10 {
875            let mut doc = Document::new();
876            let text = match i % 4 {
877                0 => "apple banana cherry",
878                1 => "apple orange",
879                2 => "banana grape",
880                _ => "cherry date",
881            };
882            doc.add_text(content, text);
883            writer.add_document(doc).unwrap();
884        }
885
886        writer.commit().await.unwrap();
887        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
888
889        // Compare explicit WandOrQuery with auto-optimized BooleanQuery
890        let wand_query = WandOrQuery::new(content).term("apple").term("banana");
891
892        let bool_query = BooleanQuery::new()
893            .should(TermQuery::text(content, "apple"))
894            .should(TermQuery::text(content, "banana"));
895
896        let wand_results = index.search(&wand_query, 10).await.unwrap();
897        let bool_results = index.search(&bool_query, 10).await.unwrap();
898
899        // Both should find the same documents
900        assert_eq!(
901            wand_results.hits.len(),
902            bool_results.hits.len(),
903            "WAND and Boolean should find same number of docs"
904        );
905
906        let wand_docs: std::collections::HashSet<u32> =
907            wand_results.hits.iter().map(|h| h.address.doc_id).collect();
908        let bool_docs: std::collections::HashSet<u32> =
909            bool_results.hits.iter().map(|h| h.address.doc_id).collect();
910
911        assert_eq!(
912            wand_docs, bool_docs,
913            "WAND and Boolean should find same documents"
914        );
915    }
916
917    #[tokio::test]
918    async fn test_vector_index_threshold_switch() {
919        use crate::dsl::{DenseVectorConfig, DenseVectorQuantization, VectorIndexType};
920
921        // Create schema with dense vector field configured for IVF-RaBitQ
922        let mut schema_builder = SchemaBuilder::default();
923        let title = schema_builder.add_text_field("title", true, true);
924        let embedding = schema_builder.add_dense_vector_field_with_config(
925            "embedding",
926            true, // indexed
927            true, // stored
928            DenseVectorConfig {
929                dim: 8,
930                index_type: VectorIndexType::IvfRaBitQ,
931                quantization: DenseVectorQuantization::F32,
932                num_clusters: Some(4), // Small for test
933                nprobe: 2,
934                build_threshold: Some(50), // Build when we have 50+ vectors
935            },
936        );
937        let schema = schema_builder.build();
938
939        let dir = RamDirectory::new();
940        let config = IndexConfig::default();
941
942        // Phase 1: Add vectors below threshold (should use Flat index)
943        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
944            .await
945            .unwrap();
946
947        // Add 30 documents (below threshold of 50)
948        for i in 0..30 {
949            let mut doc = Document::new();
950            doc.add_text(title, format!("Document {}", i));
951            // Simple embedding: [i, i, i, i, i, i, i, i] normalized
952            let vec: Vec<f32> = (0..8).map(|_| (i as f32) / 30.0).collect();
953            doc.add_dense_vector(embedding, vec);
954            writer.add_document(doc).unwrap();
955        }
956        writer.commit().await.unwrap();
957
958        // Open index and verify it's using Flat (not built yet)
959        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
960        assert!(
961            index.trained_centroids.is_empty(),
962            "Should not have trained centroids below threshold"
963        );
964
965        // Search should work with Flat index
966        let query_vec: Vec<f32> = vec![0.5; 8];
967        let segments = index.segment_readers().await.unwrap();
968        assert!(!segments.is_empty());
969
970        let results = segments[0]
971            .search_dense_vector(
972                embedding,
973                &query_vec,
974                5,
975                0,
976                1,
977                crate::query::MultiValueCombiner::Max,
978            )
979            .await
980            .unwrap();
981        assert!(!results.is_empty(), "Flat search should return results");
982
983        // Phase 2: Add more vectors to cross threshold
984        let writer = IndexWriter::open(dir.clone(), config.clone())
985            .await
986            .unwrap();
987
988        // Add 30 more documents (total 60, above threshold of 50)
989        for i in 30..60 {
990            let mut doc = Document::new();
991            doc.add_text(title, format!("Document {}", i));
992            let vec: Vec<f32> = (0..8).map(|_| (i as f32) / 60.0).collect();
993            doc.add_dense_vector(embedding, vec);
994            writer.add_document(doc).unwrap();
995        }
996        // Commit auto-triggers vector index build when threshold is crossed
997        writer.commit().await.unwrap();
998
999        // Verify centroids were trained (auto-triggered)
1000        assert!(
1001            writer.is_vector_index_built(embedding).await,
1002            "Vector index should be built after crossing threshold"
1003        );
1004
1005        // Reopen index and verify trained structures are loaded
1006        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1007        assert!(
1008            index.trained_centroids.contains_key(&embedding.0),
1009            "Should have loaded trained centroids for embedding field"
1010        );
1011
1012        // Search should still work
1013        let segments = index.segment_readers().await.unwrap();
1014        let results = segments[0]
1015            .search_dense_vector(
1016                embedding,
1017                &query_vec,
1018                5,
1019                0,
1020                1,
1021                crate::query::MultiValueCombiner::Max,
1022            )
1023            .await
1024            .unwrap();
1025        assert!(
1026            !results.is_empty(),
1027            "Search should return results after build"
1028        );
1029
1030        // Phase 3: Verify calling build_vector_index again is a no-op
1031        let writer = IndexWriter::open(dir.clone(), config.clone())
1032            .await
1033            .unwrap();
1034        writer.build_vector_index().await.unwrap(); // Should skip training
1035
1036        // Still built
1037        assert!(writer.is_vector_index_built(embedding).await);
1038    }
1039
1040    /// Multi-round merge: flush many small segments, merge, add more, merge again.
1041    /// Verifies search correctness (term + phrase queries) through multiple merge rounds.
1042    #[tokio::test]
1043    async fn test_multi_round_merge_with_search() {
1044        let mut schema_builder = SchemaBuilder::default();
1045        let title = schema_builder.add_text_field("title", true, true);
1046        let body = schema_builder.add_text_field("body", true, true);
1047        let schema = schema_builder.build();
1048
1049        let dir = RamDirectory::new();
1050        let config = IndexConfig {
1051            max_indexing_memory_bytes: 512,
1052            ..Default::default()
1053        };
1054
1055        // --- Round 1: 5 segments × 10 docs = 50 docs ---
1056        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1057            .await
1058            .unwrap();
1059
1060        for batch in 0..5 {
1061            for i in 0..10 {
1062                let mut doc = Document::new();
1063                doc.add_text(
1064                    title,
1065                    format!("alpha bravo charlie batch{} doc{}", batch, i),
1066                );
1067                doc.add_text(
1068                    body,
1069                    format!("the quick brown fox jumps over the lazy dog number {}", i),
1070                );
1071                writer.add_document(doc).unwrap();
1072            }
1073            writer.flush().await.unwrap();
1074        }
1075        writer.commit().await.unwrap();
1076
1077        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1078        let pre_merge_segments = index.segment_readers().await.unwrap().len();
1079        assert!(
1080            pre_merge_segments >= 3,
1081            "Expected >=3 segments, got {}",
1082            pre_merge_segments
1083        );
1084        assert_eq!(index.num_docs().await.unwrap(), 50);
1085
1086        // Search before merge
1087        let results = index.query("alpha", 100).await.unwrap();
1088        assert_eq!(results.hits.len(), 50, "all 50 docs should match 'alpha'");
1089
1090        let results = index.query("fox", 100).await.unwrap();
1091        assert_eq!(results.hits.len(), 50, "all 50 docs should match 'fox'");
1092
1093        // --- Merge round 1 ---
1094        let writer = IndexWriter::open(dir.clone(), config.clone())
1095            .await
1096            .unwrap();
1097        writer.force_merge().await.unwrap();
1098
1099        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1100        assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1101        assert_eq!(index.num_docs().await.unwrap(), 50);
1102
1103        // Search after first merge
1104        let results = index.query("alpha", 100).await.unwrap();
1105        assert_eq!(
1106            results.hits.len(),
1107            50,
1108            "all 50 docs should match 'alpha' after merge 1"
1109        );
1110
1111        let results = index.query("fox", 100).await.unwrap();
1112        assert_eq!(
1113            results.hits.len(),
1114            50,
1115            "all 50 docs should match 'fox' after merge 1"
1116        );
1117
1118        // Verify all docs retrievable
1119        let reader1 = index.reader().await.unwrap();
1120        let searcher1 = reader1.searcher().await.unwrap();
1121        for i in 0..50 {
1122            let doc = searcher1.doc(i).await.unwrap();
1123            assert!(doc.is_some(), "doc {} should exist after merge 1", i);
1124        }
1125
1126        // --- Round 2: add 30 more docs in 3 segments ---
1127        let writer = IndexWriter::open(dir.clone(), config.clone())
1128            .await
1129            .unwrap();
1130        for batch in 0..3 {
1131            for i in 0..10 {
1132                let mut doc = Document::new();
1133                doc.add_text(
1134                    title,
1135                    format!("delta echo foxtrot round2_batch{} doc{}", batch, i),
1136                );
1137                doc.add_text(
1138                    body,
1139                    format!("the quick brown fox jumps again number {}", i),
1140                );
1141                writer.add_document(doc).unwrap();
1142            }
1143            writer.flush().await.unwrap();
1144        }
1145        writer.commit().await.unwrap();
1146
1147        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1148        assert_eq!(index.num_docs().await.unwrap(), 80);
1149        assert!(
1150            index.segment_readers().await.unwrap().len() >= 2,
1151            "Should have >=2 segments after round 2 ingestion"
1152        );
1153
1154        // Search spans both old merged segment and new segments
1155        let results = index.query("fox", 100).await.unwrap();
1156        assert_eq!(results.hits.len(), 80, "all 80 docs should match 'fox'");
1157
1158        let results = index.query("alpha", 100).await.unwrap();
1159        assert_eq!(results.hits.len(), 50, "only round 1 docs match 'alpha'");
1160
1161        let results = index.query("delta", 100).await.unwrap();
1162        assert_eq!(results.hits.len(), 30, "only round 2 docs match 'delta'");
1163
1164        // --- Merge round 2 ---
1165        let writer = IndexWriter::open(dir.clone(), config.clone())
1166            .await
1167            .unwrap();
1168        writer.force_merge().await.unwrap();
1169
1170        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1171        assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1172        assert_eq!(index.num_docs().await.unwrap(), 80);
1173
1174        // All searches still correct after second merge
1175        let results = index.query("fox", 100).await.unwrap();
1176        assert_eq!(results.hits.len(), 80, "all 80 docs after merge 2");
1177
1178        let results = index.query("alpha", 100).await.unwrap();
1179        assert_eq!(results.hits.len(), 50, "round 1 docs after merge 2");
1180
1181        let results = index.query("delta", 100).await.unwrap();
1182        assert_eq!(results.hits.len(), 30, "round 2 docs after merge 2");
1183
1184        // Verify all 80 docs retrievable
1185        let reader2 = index.reader().await.unwrap();
1186        let searcher2 = reader2.searcher().await.unwrap();
1187        for i in 0..80 {
1188            let doc = searcher2.doc(i).await.unwrap();
1189            assert!(doc.is_some(), "doc {} should exist after merge 2", i);
1190        }
1191    }
1192
1193    /// Large-scale merge: many segments with overlapping terms, verifying
1194    /// BM25 scoring and doc retrieval after merge.
1195    #[tokio::test]
1196    async fn test_large_scale_merge_correctness() {
1197        let mut schema_builder = SchemaBuilder::default();
1198        let title = schema_builder.add_text_field("title", true, true);
1199        let schema = schema_builder.build();
1200
1201        let dir = RamDirectory::new();
1202        let config = IndexConfig {
1203            max_indexing_memory_bytes: 512,
1204            ..Default::default()
1205        };
1206
1207        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1208            .await
1209            .unwrap();
1210
1211        // 8 batches × 25 docs = 200 docs total
1212        // Terms: "common" appears in all, "unique_N" appears in batch N only
1213        let total_docs = 200u32;
1214        for batch in 0..8 {
1215            for i in 0..25 {
1216                let mut doc = Document::new();
1217                doc.add_text(
1218                    title,
1219                    format!("common shared term unique_{} item{}", batch, i),
1220                );
1221                writer.add_document(doc).unwrap();
1222            }
1223            writer.flush().await.unwrap();
1224        }
1225        writer.commit().await.unwrap();
1226
1227        // Verify pre-merge
1228        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1229        assert_eq!(index.num_docs().await.unwrap(), total_docs);
1230
1231        let results = index.query("common", 300).await.unwrap();
1232        assert_eq!(
1233            results.hits.len(),
1234            total_docs as usize,
1235            "all docs should match 'common'"
1236        );
1237
1238        // Each unique_N matches exactly 25 docs
1239        for batch in 0..8 {
1240            let q = format!("unique_{}", batch);
1241            let results = index.query(&q, 100).await.unwrap();
1242            assert_eq!(results.hits.len(), 25, "'{}' should match 25 docs", q);
1243        }
1244
1245        // Force merge
1246        let writer = IndexWriter::open(dir.clone(), config.clone())
1247            .await
1248            .unwrap();
1249        writer.force_merge().await.unwrap();
1250
1251        // Verify post-merge: single segment, same results
1252        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1253        assert_eq!(index.segment_readers().await.unwrap().len(), 1);
1254        assert_eq!(index.num_docs().await.unwrap(), total_docs);
1255
1256        let results = index.query("common", 300).await.unwrap();
1257        assert_eq!(results.hits.len(), total_docs as usize);
1258
1259        for batch in 0..8 {
1260            let q = format!("unique_{}", batch);
1261            let results = index.query(&q, 100).await.unwrap();
1262            assert_eq!(results.hits.len(), 25, "'{}' after merge", q);
1263        }
1264
1265        // Verify doc retrieval for every doc
1266        let reader = index.reader().await.unwrap();
1267        let searcher = reader.searcher().await.unwrap();
1268        for i in 0..total_docs {
1269            let doc = searcher.doc(i).await.unwrap();
1270            assert!(doc.is_some(), "doc {} missing after merge", i);
1271        }
1272    }
1273}