Skip to main content

hermes_core/index/
mod.rs

1//! Index - multi-segment async search index
2//!
3//! Components:
4//! - Index: main entry point for searching
5//! - IndexWriter: for adding documents and committing segments (native only)
6//! - Supports multiple segments with merge
7
8use std::path::Path;
9use std::sync::Arc;
10
11use parking_lot::RwLock;
12
13use rustc_hash::FxHashMap;
14
15use crate::DocId;
16use crate::directories::{Directory, SliceCachingDirectory};
17use crate::dsl::{Document, Field, Schema};
18use crate::error::{Error, Result};
19use crate::segment::{SegmentId, SegmentReader};
20use crate::structures::BlockPostingList;
21use crate::structures::{CoarseCentroids, PQCodebook};
22
23#[cfg(feature = "native")]
24mod writer;
25#[cfg(feature = "native")]
26pub use writer::IndexWriter;
27
28#[cfg(feature = "native")]
29mod metadata;
30#[cfg(feature = "native")]
31pub use metadata::{FieldVectorMeta, INDEX_META_FILENAME, IndexMetadata, VectorIndexState};
32
33#[cfg(feature = "native")]
34mod helpers;
35#[cfg(feature = "native")]
36pub use helpers::{
37    IndexingStats, SchemaConfig, SchemaFieldConfig, create_index_at_path, create_index_from_sdl,
38    index_documents_from_reader, index_json_document, parse_schema,
39};
40
41/// Default file name for the slice cache
42pub const SLICE_CACHE_FILENAME: &str = "index.slicecache";
43
44/// Index configuration
45#[derive(Debug, Clone)]
46pub struct IndexConfig {
47    /// Number of threads for CPU-intensive tasks (search parallelism)
48    pub num_threads: usize,
49    /// Number of parallel segment builders (documents distributed round-robin)
50    pub num_indexing_threads: usize,
51    /// Number of threads for parallel block compression within each segment
52    pub num_compression_threads: usize,
53    /// Block cache size for term dictionary per segment
54    pub term_cache_blocks: usize,
55    /// Block cache size for document store per segment
56    pub store_cache_blocks: usize,
57    /// Max documents per segment before auto-commit
58    pub max_docs_per_segment: u32,
59    /// Merge policy for background segment merging
60    pub merge_policy: Box<dyn crate::merge::MergePolicy>,
61    /// Index optimization mode (adaptive, size-optimized, performance-optimized)
62    pub optimization: crate::structures::IndexOptimization,
63}
64
65impl Default for IndexConfig {
66    fn default() -> Self {
67        #[cfg(feature = "native")]
68        let cpus = num_cpus::get().max(1);
69        #[cfg(not(feature = "native"))]
70        let cpus = 1;
71
72        Self {
73            num_threads: cpus,
74            num_indexing_threads: 1,
75            num_compression_threads: cpus,
76            term_cache_blocks: 256,
77            store_cache_blocks: 32,
78            max_docs_per_segment: 100_000,
79            merge_policy: Box::new(crate::merge::TieredMergePolicy::default()),
80            optimization: crate::structures::IndexOptimization::default(),
81        }
82    }
83}
84
85/// Multi-segment async Index
86///
87/// The main entry point for searching. Manages multiple segments
88/// and provides unified search across all of them.
89pub struct Index<D: Directory> {
90    directory: Arc<D>,
91    schema: Arc<Schema>,
92    config: IndexConfig,
93    segments: RwLock<Vec<Arc<SegmentReader>>>,
94    default_fields: Vec<crate::Field>,
95    tokenizers: Arc<crate::tokenizer::TokenizerRegistry>,
96    /// Cached global statistics for cross-segment IDF computation
97    global_stats: crate::query::GlobalStatsCache,
98    /// Index-level trained centroids per field (loaded from metadata)
99    trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
100    /// Index-level trained PQ codebooks per field (for ScaNN)
101    trained_codebooks: FxHashMap<u32, Arc<PQCodebook>>,
102    #[cfg(feature = "native")]
103    thread_pool: Arc<rayon::ThreadPool>,
104}
105
106impl<D: Directory> Index<D> {
107    /// Open an existing index from a directory
108    pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
109        let directory = Arc::new(directory);
110
111        // Read schema
112        let schema_slice = directory.open_read(Path::new("schema.json")).await?;
113        let schema_bytes = schema_slice.read_bytes().await?;
114        let schema: Schema = serde_json::from_slice(schema_bytes.as_slice())
115            .map_err(|e| Error::Serialization(e.to_string()))?;
116        let schema = Arc::new(schema);
117
118        // Load metadata and trained structures
119        let (segments, trained_centroids, trained_codebooks) =
120            Self::load_segments_and_trained(&directory, &schema, &config).await?;
121
122        #[cfg(feature = "native")]
123        let thread_pool = {
124            let pool = rayon::ThreadPoolBuilder::new()
125                .num_threads(config.num_threads)
126                .build()
127                .map_err(|e| Error::Io(std::io::Error::other(e)))?;
128            Arc::new(pool)
129        };
130
131        // Use schema's default_fields if specified, otherwise fall back to all indexed text fields
132        let default_fields: Vec<crate::Field> = if !schema.default_fields().is_empty() {
133            schema.default_fields().to_vec()
134        } else {
135            schema
136                .fields()
137                .filter(|(_, entry)| {
138                    entry.indexed && entry.field_type == crate::dsl::FieldType::Text
139                })
140                .map(|(field, _)| field)
141                .collect()
142        };
143
144        Ok(Self {
145            directory,
146            schema,
147            config,
148            segments: RwLock::new(segments),
149            default_fields,
150            tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
151            global_stats: crate::query::GlobalStatsCache::new(),
152            trained_centroids,
153            trained_codebooks,
154            #[cfg(feature = "native")]
155            thread_pool,
156        })
157    }
158
159    /// Load segments and trained structures from metadata
160    async fn load_segments_and_trained(
161        directory: &Arc<D>,
162        schema: &Arc<Schema>,
163        config: &IndexConfig,
164    ) -> Result<(
165        Vec<Arc<SegmentReader>>,
166        FxHashMap<u32, Arc<CoarseCentroids>>,
167        FxHashMap<u32, Arc<PQCodebook>>,
168    )> {
169        // Load metadata
170        let meta = Self::load_metadata(directory).await?;
171
172        // Load trained centroids and codebooks
173        let (trained_centroids, trained_codebooks) =
174            meta.load_trained_structures(directory.as_ref()).await;
175
176        // Load segments
177        let mut segments = Vec::new();
178        let mut doc_id_offset = 0u32;
179
180        for id_str in meta.segments {
181            let segment_id = SegmentId::from_hex(&id_str)
182                .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
183            let reader = SegmentReader::open(
184                directory.as_ref(),
185                segment_id,
186                Arc::clone(schema),
187                doc_id_offset,
188                config.term_cache_blocks,
189            )
190            .await?;
191
192            doc_id_offset += reader.meta().num_docs;
193            segments.push(Arc::new(reader));
194        }
195
196        Ok((segments, trained_centroids, trained_codebooks))
197    }
198
199    /// Load metadata from metadata.json
200    async fn load_metadata(directory: &Arc<D>) -> Result<IndexMetadata> {
201        let meta_path = Path::new(INDEX_META_FILENAME);
202        if directory.exists(meta_path).await.unwrap_or(false) {
203            let slice = directory.open_read(meta_path).await?;
204            let bytes = slice.read_bytes().await?;
205            let meta: IndexMetadata = serde_json::from_slice(bytes.as_slice())
206                .map_err(|e| Error::Serialization(e.to_string()))?;
207            return Ok(meta);
208        }
209        Ok(IndexMetadata::new())
210    }
211
212    /// Get the schema
213    pub fn schema(&self) -> &Schema {
214        &self.schema
215    }
216
217    /// Get a reference to the underlying directory
218    pub fn directory(&self) -> &D {
219        &self.directory
220    }
221
222    /// Total number of documents across all segments
223    pub fn num_docs(&self) -> u32 {
224        self.segments.read().iter().map(|s| s.num_docs()).sum()
225    }
226
227    /// Get a document by global doc_id (async)
228    pub async fn doc(&self, doc_id: DocId) -> Result<Option<Document>> {
229        let segments = self.segments.read().clone();
230
231        let mut offset = 0u32;
232        for segment in segments.iter() {
233            let segment_docs = segment.meta().num_docs;
234            if doc_id < offset + segment_docs {
235                let local_doc_id = doc_id - offset;
236                return segment.doc(local_doc_id).await;
237            }
238            offset += segment_docs;
239        }
240
241        Ok(None)
242    }
243
244    /// Get posting lists for a term across all segments (async)
245    pub async fn get_postings(
246        &self,
247        field: Field,
248        term: &[u8],
249    ) -> Result<Vec<(Arc<SegmentReader>, BlockPostingList)>> {
250        let segments = self.segments.read().clone();
251        let mut results = Vec::new();
252
253        for segment in segments.iter() {
254            if let Some(postings) = segment.get_postings(field, term).await? {
255                results.push((Arc::clone(segment), postings));
256            }
257        }
258
259        Ok(results)
260    }
261
262    /// Execute CPU-intensive work on thread pool (native only)
263    #[cfg(feature = "native")]
264    pub async fn spawn_blocking<F, R>(&self, f: F) -> R
265    where
266        F: FnOnce() -> R + Send + 'static,
267        R: Send + 'static,
268    {
269        let (tx, rx) = tokio::sync::oneshot::channel();
270        self.thread_pool.spawn(move || {
271            let result = f();
272            let _ = tx.send(result);
273        });
274        rx.await.expect("Thread pool task panicked")
275    }
276
277    /// Get segment readers for query execution
278    pub fn segment_readers(&self) -> Vec<Arc<SegmentReader>> {
279        self.segments.read().clone()
280    }
281
282    /// Reload segments from directory (after new segments added)
283    /// Note: This only reloads segments, not trained structures (they are immutable once built)
284    pub async fn reload(&self) -> Result<()> {
285        let (new_segments, _, _) =
286            Self::load_segments_and_trained(&self.directory, &self.schema, &self.config).await?;
287        *self.segments.write() = new_segments;
288        // Invalidate global stats cache since segments changed
289        self.global_stats.invalidate();
290        Ok(())
291    }
292
293    /// Get global statistics for cross-segment IDF computation (sync, basic stats only)
294    ///
295    /// Returns cached stats if available. For full stats including term frequencies,
296    /// call `build_global_stats().await` first.
297    ///
298    /// This sync version only includes:
299    /// - Total docs
300    /// - Sparse vector dimension document frequencies
301    /// - Average field lengths
302    pub fn global_stats(&self) -> Option<Arc<crate::query::GlobalStats>> {
303        self.global_stats.get()
304    }
305
306    /// Build and cache global statistics (async, includes term frequencies)
307    ///
308    /// This iterates term dictionaries across all segments to compute
309    /// accurate cross-segment IDF values for full-text queries.
310    ///
311    /// Call this once after opening the index or after reload().
312    pub async fn build_global_stats(&self) -> Result<Arc<crate::query::GlobalStats>> {
313        // Return cached if available
314        if let Some(stats) = self.global_stats.get() {
315            return Ok(stats);
316        }
317
318        let segments = self.segments.read().clone();
319        let schema = &self.schema;
320        let mut builder = crate::query::GlobalStatsBuilder::new();
321
322        // Track field length sums for computing global avg
323        let mut field_len_sums: rustc_hash::FxHashMap<u32, (u64, u64)> =
324            rustc_hash::FxHashMap::default();
325
326        for segment in &segments {
327            let num_docs = segment.num_docs() as u64;
328            builder.total_docs += num_docs;
329
330            // Aggregate sparse vector statistics
331            for (&field_id, sparse_index) in segment.sparse_indexes() {
332                for (dim_id, posting_list) in sparse_index.postings.iter().enumerate() {
333                    if let Some(pl) = posting_list {
334                        builder.add_sparse_df(
335                            crate::dsl::Field(field_id),
336                            dim_id as u32,
337                            pl.doc_count() as u64,
338                        );
339                    }
340                }
341            }
342
343            // Aggregate text field average lengths
344            for (field, entry) in schema.fields() {
345                if entry.indexed && entry.field_type == crate::dsl::FieldType::Text {
346                    let avg_len = segment.avg_field_len(field);
347                    let (sum, count) = field_len_sums.entry(field.0).or_insert((0, 0));
348                    *sum += (avg_len * num_docs as f32) as u64;
349                    *count += num_docs;
350                }
351            }
352
353            // Iterate term dictionary to get term document frequencies
354            for (field, term, doc_freq) in segment.all_terms_with_stats().await? {
355                builder.add_text_df(field, term, doc_freq as u64);
356            }
357        }
358
359        // Set global average field lengths
360        for (field_id, (sum, count)) in field_len_sums {
361            if count > 0 {
362                let global_avg = sum as f32 / count as f32;
363                builder.set_avg_field_len(crate::dsl::Field(field_id), global_avg);
364            }
365        }
366
367        let generation = self.global_stats.generation();
368        let stats = builder.build(generation);
369        self.global_stats.set_stats(stats);
370
371        Ok(self.global_stats.get().unwrap())
372    }
373
374    /// Search and return results with document addresses (no document content)
375    ///
376    /// This is the primary search method. Use `get_document` to fetch document content.
377    pub async fn search(
378        &self,
379        query: &dyn crate::query::Query,
380        limit: usize,
381    ) -> Result<crate::query::SearchResponse> {
382        self.search_offset(query, limit, 0).await
383    }
384
385    /// Search with offset for pagination
386    pub async fn search_offset(
387        &self,
388        query: &dyn crate::query::Query,
389        limit: usize,
390        offset: usize,
391    ) -> Result<crate::query::SearchResponse> {
392        self.search_internal(query, limit, offset, false).await
393    }
394
395    /// Search with matched field ordinals (for multi-valued fields with position tracking)
396    ///
397    /// Returns which array elements matched for each field with position tracking enabled.
398    pub async fn search_with_matched_fields(
399        &self,
400        query: &dyn crate::query::Query,
401        limit: usize,
402    ) -> Result<crate::query::SearchResponse> {
403        self.search_internal(query, limit, 0, true).await
404    }
405
406    async fn search_internal(
407        &self,
408        query: &dyn crate::query::Query,
409        limit: usize,
410        offset: usize,
411        collect_positions: bool,
412    ) -> Result<crate::query::SearchResponse> {
413        let segments = self.segments.read().clone();
414        let mut all_results: Vec<(u128, crate::query::SearchResult)> = Vec::new();
415
416        // Fetch enough results to cover offset + limit
417        let fetch_limit = offset + limit;
418        for segment in &segments {
419            let segment_id = segment.meta().id;
420            let results = crate::query::search_segment_with_positions(
421                segment.as_ref(),
422                query,
423                fetch_limit,
424                collect_positions,
425            )
426            .await?;
427            for result in results {
428                all_results.push((segment_id, result));
429            }
430        }
431
432        // Sort by score descending
433        all_results.sort_by(|a, b| {
434            b.1.score
435                .partial_cmp(&a.1.score)
436                .unwrap_or(std::cmp::Ordering::Equal)
437        });
438
439        // Total hits before pagination
440        let total_hits = all_results.len() as u32;
441
442        // Apply offset and limit
443        let hits: Vec<crate::query::SearchHit> = all_results
444            .into_iter()
445            .skip(offset)
446            .take(limit)
447            .map(|(segment_id, result)| crate::query::SearchHit {
448                address: crate::query::DocAddress::new(segment_id, result.doc_id),
449                score: result.score,
450                matched_fields: result.extract_ordinals(),
451            })
452            .collect();
453
454        Ok(crate::query::SearchResponse { hits, total_hits })
455    }
456
457    /// Get a document by its unique address (segment_id + local doc_id)
458    pub async fn get_document(
459        &self,
460        address: &crate::query::DocAddress,
461    ) -> Result<Option<Document>> {
462        let segment_id = address
463            .segment_id_u128()
464            .ok_or_else(|| Error::Query(format!("Invalid segment ID: {}", address.segment_id)))?;
465
466        let segments = self.segments.read().clone();
467        for segment in &segments {
468            if segment.meta().id == segment_id {
469                return segment.doc(address.doc_id).await;
470            }
471        }
472
473        Ok(None)
474    }
475
476    /// Get the default fields for this index
477    pub fn default_fields(&self) -> &[crate::Field] {
478        &self.default_fields
479    }
480
481    /// Set the default fields for query parsing
482    pub fn set_default_fields(&mut self, fields: Vec<crate::Field>) {
483        self.default_fields = fields;
484    }
485
486    /// Get the tokenizer registry
487    pub fn tokenizers(&self) -> &Arc<crate::tokenizer::TokenizerRegistry> {
488        &self.tokenizers
489    }
490
491    /// Create a query parser for this index
492    ///
493    /// If the schema contains query router rules, they will be used to route
494    /// queries to specific fields based on regex patterns.
495    pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
496        // Check if schema has query routers
497        let query_routers = self.schema.query_routers();
498        if !query_routers.is_empty() {
499            // Try to create a router from the schema's rules
500            if let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers) {
501                return crate::dsl::QueryLanguageParser::with_router(
502                    Arc::clone(&self.schema),
503                    self.default_fields.clone(),
504                    Arc::clone(&self.tokenizers),
505                    router,
506                );
507            }
508        }
509
510        // Fall back to parser without router
511        crate::dsl::QueryLanguageParser::new(
512            Arc::clone(&self.schema),
513            self.default_fields.clone(),
514            Arc::clone(&self.tokenizers),
515        )
516    }
517
518    /// Parse and search using a query string
519    ///
520    /// Accepts both query language syntax (field:term, AND, OR, NOT, grouping)
521    /// and simple text (tokenized and searched across default fields).
522    /// Returns document addresses (segment_id + doc_id) without document content.
523    pub async fn query(
524        &self,
525        query_str: &str,
526        limit: usize,
527    ) -> Result<crate::query::SearchResponse> {
528        self.query_offset(query_str, limit, 0).await
529    }
530
531    /// Query with offset for pagination
532    pub async fn query_offset(
533        &self,
534        query_str: &str,
535        limit: usize,
536        offset: usize,
537    ) -> Result<crate::query::SearchResponse> {
538        let parser = self.query_parser();
539        let query = parser.parse(query_str).map_err(Error::Query)?;
540        self.search_offset(query.as_ref(), limit, offset).await
541    }
542}
543
544/// Methods for opening index with slice caching
545impl<D: Directory> Index<SliceCachingDirectory<D>> {
546    /// Open an index with slice caching, automatically loading the cache file if present
547    ///
548    /// This wraps the directory in a SliceCachingDirectory and attempts to load
549    /// any existing slice cache file to prefill the cache with hot data.
550    pub async fn open_with_cache(
551        directory: D,
552        config: IndexConfig,
553        cache_max_bytes: usize,
554    ) -> Result<Self> {
555        let caching_dir = SliceCachingDirectory::new(directory, cache_max_bytes);
556
557        // Try to load existing slice cache
558        let cache_path = Path::new(SLICE_CACHE_FILENAME);
559        if let Ok(true) = caching_dir.inner().exists(cache_path).await
560            && let Ok(slice) = caching_dir.inner().open_read(cache_path).await
561            && let Ok(bytes) = slice.read_bytes().await
562        {
563            let _ = caching_dir.deserialize(bytes.as_slice());
564        }
565
566        Self::open(caching_dir, config).await
567    }
568
569    /// Serialize the current slice cache to the index directory
570    ///
571    /// This saves all cached slices to a single file that can be loaded
572    /// on subsequent index opens for faster startup.
573    #[cfg(feature = "native")]
574    pub async fn save_slice_cache(&self) -> Result<()>
575    where
576        D: crate::directories::DirectoryWriter,
577    {
578        let cache_data = self.directory.serialize();
579        let cache_path = Path::new(SLICE_CACHE_FILENAME);
580        self.directory
581            .inner()
582            .write(cache_path, &cache_data)
583            .await?;
584        Ok(())
585    }
586
587    /// Get slice cache statistics
588    pub fn slice_cache_stats(&self) -> crate::directories::SliceCacheStats {
589        self.directory.stats()
590    }
591}
592
593/// Warm up the slice cache by opening an index and performing typical read operations
594///
595/// This function opens an index using a SliceCachingDirectory, performs operations
596/// that would typically be done during search (reading term dictionaries, posting lists),
597/// and then serializes the cache to a file for future use.
598///
599/// The resulting cache file contains all the "hot" data that was read during warmup,
600/// allowing subsequent index opens to prefill the cache and avoid cold-start latency.
601#[cfg(feature = "native")]
602pub async fn warmup_and_save_slice_cache<D: crate::directories::DirectoryWriter>(
603    directory: D,
604    config: IndexConfig,
605    cache_max_bytes: usize,
606) -> Result<()> {
607    let caching_dir = SliceCachingDirectory::new(directory, cache_max_bytes);
608    let index = Index::open(caching_dir, config).await?;
609
610    // Warm up by loading segment metadata and term dictionaries
611    // The SegmentReader::open already reads essential metadata
612    // Additional warmup can be done by iterating terms or doing sample queries
613
614    // Save the cache
615    index.save_slice_cache().await?;
616
617    Ok(())
618}
619
620#[cfg(feature = "native")]
621impl<D: Directory> Clone for Index<D> {
622    fn clone(&self) -> Self {
623        Self {
624            directory: Arc::clone(&self.directory),
625            schema: Arc::clone(&self.schema),
626            config: self.config.clone(),
627            segments: RwLock::new(self.segments.read().clone()),
628            default_fields: self.default_fields.clone(),
629            tokenizers: Arc::clone(&self.tokenizers),
630            global_stats: crate::query::GlobalStatsCache::new(),
631            trained_centroids: self.trained_centroids.clone(),
632            trained_codebooks: self.trained_codebooks.clone(),
633            thread_pool: Arc::clone(&self.thread_pool),
634        }
635    }
636}
637
638#[cfg(test)]
639mod tests {
640    use super::*;
641    use crate::directories::RamDirectory;
642    use crate::dsl::SchemaBuilder;
643
644    #[tokio::test]
645    async fn test_index_create_and_search() {
646        let mut schema_builder = SchemaBuilder::default();
647        let title = schema_builder.add_text_field("title", true, true);
648        let body = schema_builder.add_text_field("body", true, true);
649        let schema = schema_builder.build();
650
651        let dir = RamDirectory::new();
652        let config = IndexConfig::default();
653
654        // Create index and add documents
655        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
656            .await
657            .unwrap();
658
659        let mut doc1 = Document::new();
660        doc1.add_text(title, "Hello World");
661        doc1.add_text(body, "This is the first document");
662        writer.add_document(doc1).await.unwrap();
663
664        let mut doc2 = Document::new();
665        doc2.add_text(title, "Goodbye World");
666        doc2.add_text(body, "This is the second document");
667        writer.add_document(doc2).await.unwrap();
668
669        writer.commit().await.unwrap();
670
671        // Open for reading
672        let index = Index::open(dir, config).await.unwrap();
673        assert_eq!(index.num_docs(), 2);
674
675        // Check postings
676        let postings = index.get_postings(title, b"world").await.unwrap();
677        assert_eq!(postings.len(), 1); // One segment
678        assert_eq!(postings[0].1.doc_count(), 2); // Two docs with "world"
679
680        // Retrieve document
681        let doc = index.doc(0).await.unwrap().unwrap();
682        assert_eq!(doc.get_first(title).unwrap().as_text(), Some("Hello World"));
683    }
684
685    #[tokio::test]
686    async fn test_multiple_segments() {
687        let mut schema_builder = SchemaBuilder::default();
688        let title = schema_builder.add_text_field("title", true, true);
689        let schema = schema_builder.build();
690
691        let dir = RamDirectory::new();
692        let config = IndexConfig {
693            max_docs_per_segment: 5, // Small segments for testing
694            ..Default::default()
695        };
696
697        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
698            .await
699            .unwrap();
700
701        // Add documents in batches to create multiple segments
702        for batch in 0..3 {
703            for i in 0..5 {
704                let mut doc = Document::new();
705                doc.add_text(title, format!("Document {} batch {}", i, batch));
706                writer.add_document(doc).await.unwrap();
707            }
708            writer.commit().await.unwrap();
709        }
710
711        // Open and check
712        let index = Index::open(dir, config).await.unwrap();
713        assert_eq!(index.num_docs(), 15);
714        assert_eq!(index.segment_readers().len(), 3);
715    }
716
717    #[tokio::test]
718    async fn test_segment_merge() {
719        let mut schema_builder = SchemaBuilder::default();
720        let title = schema_builder.add_text_field("title", true, true);
721        let schema = schema_builder.build();
722
723        let dir = RamDirectory::new();
724        let config = IndexConfig {
725            max_docs_per_segment: 3,
726            ..Default::default()
727        };
728
729        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
730            .await
731            .unwrap();
732
733        // Create multiple segments
734        for i in 0..9 {
735            let mut doc = Document::new();
736            doc.add_text(title, format!("Document {}", i));
737            writer.add_document(doc).await.unwrap();
738        }
739        writer.commit().await.unwrap();
740
741        // Should have 3 segments
742        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
743        assert_eq!(index.segment_readers().len(), 3);
744
745        // Force merge
746        let writer = IndexWriter::open(dir.clone(), config.clone())
747            .await
748            .unwrap();
749        writer.force_merge().await.unwrap();
750
751        // Should have 1 segment now
752        let index = Index::open(dir, config).await.unwrap();
753        assert_eq!(index.segment_readers().len(), 1);
754        assert_eq!(index.num_docs(), 9);
755
756        // Verify all documents accessible
757        for i in 0..9 {
758            let doc = index.doc(i).await.unwrap().unwrap();
759            assert_eq!(
760                doc.get_first(title).unwrap().as_text(),
761                Some(format!("Document {}", i).as_str())
762            );
763        }
764    }
765
766    #[tokio::test]
767    async fn test_match_query() {
768        let mut schema_builder = SchemaBuilder::default();
769        let title = schema_builder.add_text_field("title", true, true);
770        let body = schema_builder.add_text_field("body", true, true);
771        let schema = schema_builder.build();
772
773        let dir = RamDirectory::new();
774        let config = IndexConfig::default();
775
776        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
777            .await
778            .unwrap();
779
780        let mut doc1 = Document::new();
781        doc1.add_text(title, "rust programming");
782        doc1.add_text(body, "Learn rust language");
783        writer.add_document(doc1).await.unwrap();
784
785        let mut doc2 = Document::new();
786        doc2.add_text(title, "python programming");
787        doc2.add_text(body, "Learn python language");
788        writer.add_document(doc2).await.unwrap();
789
790        writer.commit().await.unwrap();
791
792        let index = Index::open(dir, config).await.unwrap();
793
794        // Test match query with multiple default fields
795        let results = index.query("rust", 10).await.unwrap();
796        assert_eq!(results.hits.len(), 1);
797
798        // Test match query with multiple tokens
799        let results = index.query("rust programming", 10).await.unwrap();
800        assert!(!results.hits.is_empty());
801
802        // Verify hit has address (segment_id + doc_id)
803        let hit = &results.hits[0];
804        assert!(!hit.address.segment_id.is_empty(), "Should have segment_id");
805
806        // Verify document retrieval by address
807        let doc = index.get_document(&hit.address).await.unwrap().unwrap();
808        assert!(
809            !doc.field_values().is_empty(),
810            "Doc should have field values"
811        );
812
813        // Also verify doc retrieval directly by global doc_id
814        let doc = index.doc(0).await.unwrap().unwrap();
815        assert!(
816            !doc.field_values().is_empty(),
817            "Doc should have field values"
818        );
819    }
820
821    #[tokio::test]
822    async fn test_slice_cache_warmup_and_load() {
823        use crate::directories::SliceCachingDirectory;
824
825        let mut schema_builder = SchemaBuilder::default();
826        let title = schema_builder.add_text_field("title", true, true);
827        let body = schema_builder.add_text_field("body", true, true);
828        let schema = schema_builder.build();
829
830        let dir = RamDirectory::new();
831        let config = IndexConfig::default();
832
833        // Create index with some documents
834        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
835            .await
836            .unwrap();
837
838        for i in 0..10 {
839            let mut doc = Document::new();
840            doc.add_text(title, format!("Document {} about rust", i));
841            doc.add_text(body, format!("This is body text number {}", i));
842            writer.add_document(doc).await.unwrap();
843        }
844        writer.commit().await.unwrap();
845
846        // Open with slice caching and perform some operations to warm up cache
847        let caching_dir = SliceCachingDirectory::new(dir.clone(), 1024 * 1024);
848        let index = Index::open(caching_dir, config.clone()).await.unwrap();
849
850        // Perform a search to warm up the cache
851        let results = index.query("rust", 10).await.unwrap();
852        assert!(!results.hits.is_empty());
853
854        // Check cache stats - should have cached some data
855        let stats = index.slice_cache_stats();
856        assert!(stats.total_bytes > 0, "Cache should have data after search");
857
858        // Save the cache
859        index.save_slice_cache().await.unwrap();
860
861        // Verify cache file was written
862        assert!(dir.exists(Path::new(SLICE_CACHE_FILENAME)).await.unwrap());
863
864        // Now open with cache loading
865        let index2 = Index::open_with_cache(dir.clone(), config.clone(), 1024 * 1024)
866            .await
867            .unwrap();
868
869        // Cache should be prefilled
870        let stats2 = index2.slice_cache_stats();
871        assert!(
872            stats2.total_bytes > 0,
873            "Cache should be prefilled from file"
874        );
875
876        // Search should still work
877        let results2 = index2.query("rust", 10).await.unwrap();
878        assert_eq!(results.hits.len(), results2.hits.len());
879    }
880
881    #[tokio::test]
882    async fn test_multivalue_field_indexing_and_search() {
883        let mut schema_builder = SchemaBuilder::default();
884        let uris = schema_builder.add_text_field("uris", true, true);
885        let title = schema_builder.add_text_field("title", true, true);
886        let schema = schema_builder.build();
887
888        let dir = RamDirectory::new();
889        let config = IndexConfig::default();
890
891        // Create index and add document with multi-value field
892        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
893            .await
894            .unwrap();
895
896        let mut doc = Document::new();
897        doc.add_text(uris, "one");
898        doc.add_text(uris, "two");
899        doc.add_text(title, "Test Document");
900        writer.add_document(doc).await.unwrap();
901
902        // Add another document with different uris
903        let mut doc2 = Document::new();
904        doc2.add_text(uris, "three");
905        doc2.add_text(title, "Another Document");
906        writer.add_document(doc2).await.unwrap();
907
908        writer.commit().await.unwrap();
909
910        // Open for reading
911        let index = Index::open(dir, config).await.unwrap();
912        assert_eq!(index.num_docs(), 2);
913
914        // Verify document retrieval preserves all values
915        let doc = index.doc(0).await.unwrap().unwrap();
916        let all_uris: Vec<_> = doc.get_all(uris).collect();
917        assert_eq!(all_uris.len(), 2, "Should have 2 uris values");
918        assert_eq!(all_uris[0].as_text(), Some("one"));
919        assert_eq!(all_uris[1].as_text(), Some("two"));
920
921        // Verify to_json returns array for multi-value field
922        let json = doc.to_json(index.schema());
923        let uris_json = json.get("uris").unwrap();
924        assert!(uris_json.is_array(), "Multi-value field should be an array");
925        let uris_arr = uris_json.as_array().unwrap();
926        assert_eq!(uris_arr.len(), 2);
927        assert_eq!(uris_arr[0].as_str(), Some("one"));
928        assert_eq!(uris_arr[1].as_str(), Some("two"));
929
930        // Verify both values are searchable
931        let results = index.query("uris:one", 10).await.unwrap();
932        assert_eq!(results.hits.len(), 1, "Should find doc with 'one'");
933        assert_eq!(results.hits[0].address.doc_id, 0);
934
935        let results = index.query("uris:two", 10).await.unwrap();
936        assert_eq!(results.hits.len(), 1, "Should find doc with 'two'");
937        assert_eq!(results.hits[0].address.doc_id, 0);
938
939        let results = index.query("uris:three", 10).await.unwrap();
940        assert_eq!(results.hits.len(), 1, "Should find doc with 'three'");
941        assert_eq!(results.hits[0].address.doc_id, 1);
942
943        // Verify searching for non-existent value returns no results
944        let results = index.query("uris:nonexistent", 10).await.unwrap();
945        assert_eq!(results.hits.len(), 0, "Should not find non-existent value");
946    }
947
948    /// Comprehensive test for WAND optimization in BooleanQuery OR queries
949    ///
950    /// This test verifies that:
951    /// 1. BooleanQuery with multiple SHOULD term queries uses WAND automatically
952    /// 2. Search results are correct regardless of WAND optimization
953    /// 3. Scores are reasonable for matching documents
954    #[tokio::test]
955    async fn test_wand_optimization_for_or_queries() {
956        use crate::query::{BooleanQuery, TermQuery};
957
958        let mut schema_builder = SchemaBuilder::default();
959        let content = schema_builder.add_text_field("content", true, true);
960        let schema = schema_builder.build();
961
962        let dir = RamDirectory::new();
963        let config = IndexConfig::default();
964
965        // Create index with documents containing various terms
966        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
967            .await
968            .unwrap();
969
970        // Doc 0: contains "rust" and "programming"
971        let mut doc = Document::new();
972        doc.add_text(content, "rust programming language is fast");
973        writer.add_document(doc).await.unwrap();
974
975        // Doc 1: contains "rust" only
976        let mut doc = Document::new();
977        doc.add_text(content, "rust is a systems language");
978        writer.add_document(doc).await.unwrap();
979
980        // Doc 2: contains "programming" only
981        let mut doc = Document::new();
982        doc.add_text(content, "programming is fun");
983        writer.add_document(doc).await.unwrap();
984
985        // Doc 3: contains "python" (neither rust nor programming)
986        let mut doc = Document::new();
987        doc.add_text(content, "python is easy to learn");
988        writer.add_document(doc).await.unwrap();
989
990        // Doc 4: contains both "rust" and "programming" multiple times
991        let mut doc = Document::new();
992        doc.add_text(content, "rust rust programming programming systems");
993        writer.add_document(doc).await.unwrap();
994
995        writer.commit().await.unwrap();
996
997        // Open for reading
998        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
999
1000        // Test 1: Pure OR query with multiple terms (should use WAND automatically)
1001        let or_query = BooleanQuery::new()
1002            .should(TermQuery::text(content, "rust"))
1003            .should(TermQuery::text(content, "programming"));
1004
1005        let results = index.search(&or_query, 10).await.unwrap();
1006
1007        // Should find docs 0, 1, 2, 4 (all that contain "rust" OR "programming")
1008        assert_eq!(results.hits.len(), 4, "Should find exactly 4 documents");
1009
1010        let doc_ids: Vec<u32> = results.hits.iter().map(|h| h.address.doc_id).collect();
1011        assert!(doc_ids.contains(&0), "Should find doc 0");
1012        assert!(doc_ids.contains(&1), "Should find doc 1");
1013        assert!(doc_ids.contains(&2), "Should find doc 2");
1014        assert!(doc_ids.contains(&4), "Should find doc 4");
1015        assert!(
1016            !doc_ids.contains(&3),
1017            "Should NOT find doc 3 (only has 'python')"
1018        );
1019
1020        // Test 2: Single term query (should NOT use WAND, but still work)
1021        let single_query = BooleanQuery::new().should(TermQuery::text(content, "rust"));
1022
1023        let results = index.search(&single_query, 10).await.unwrap();
1024        assert_eq!(results.hits.len(), 3, "Should find 3 documents with 'rust'");
1025
1026        // Test 3: Query with MUST (should NOT use WAND)
1027        let must_query = BooleanQuery::new()
1028            .must(TermQuery::text(content, "rust"))
1029            .should(TermQuery::text(content, "programming"));
1030
1031        let results = index.search(&must_query, 10).await.unwrap();
1032        // Must have "rust", optionally "programming"
1033        assert_eq!(results.hits.len(), 3, "Should find 3 documents with 'rust'");
1034
1035        // Test 4: Query with MUST_NOT (should NOT use WAND)
1036        let must_not_query = BooleanQuery::new()
1037            .should(TermQuery::text(content, "rust"))
1038            .should(TermQuery::text(content, "programming"))
1039            .must_not(TermQuery::text(content, "systems"));
1040
1041        let results = index.search(&must_not_query, 10).await.unwrap();
1042        // Should exclude docs with "systems" (doc 1 and 4)
1043        let doc_ids: Vec<u32> = results.hits.iter().map(|h| h.address.doc_id).collect();
1044        assert!(
1045            !doc_ids.contains(&1),
1046            "Should NOT find doc 1 (has 'systems')"
1047        );
1048        assert!(
1049            !doc_ids.contains(&4),
1050            "Should NOT find doc 4 (has 'systems')"
1051        );
1052
1053        // Test 5: Verify top-k limit works correctly with WAND
1054        let or_query = BooleanQuery::new()
1055            .should(TermQuery::text(content, "rust"))
1056            .should(TermQuery::text(content, "programming"));
1057
1058        let results = index.search(&or_query, 2).await.unwrap();
1059        assert_eq!(results.hits.len(), 2, "Should return only top 2 results");
1060
1061        // Top results should be docs that match both terms (higher scores)
1062        // Doc 0 and 4 contain both "rust" and "programming"
1063    }
1064
1065    /// Test that WAND optimization produces same results as non-WAND for correctness
1066    #[tokio::test]
1067    async fn test_wand_results_match_standard_boolean() {
1068        use crate::query::{BooleanQuery, TermQuery, WandOrQuery};
1069
1070        let mut schema_builder = SchemaBuilder::default();
1071        let content = schema_builder.add_text_field("content", true, true);
1072        let schema = schema_builder.build();
1073
1074        let dir = RamDirectory::new();
1075        let config = IndexConfig::default();
1076
1077        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1078            .await
1079            .unwrap();
1080
1081        // Add several documents
1082        for i in 0..10 {
1083            let mut doc = Document::new();
1084            let text = match i % 4 {
1085                0 => "apple banana cherry",
1086                1 => "apple orange",
1087                2 => "banana grape",
1088                _ => "cherry date",
1089            };
1090            doc.add_text(content, text);
1091            writer.add_document(doc).await.unwrap();
1092        }
1093
1094        writer.commit().await.unwrap();
1095        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1096
1097        // Compare explicit WandOrQuery with auto-optimized BooleanQuery
1098        let wand_query = WandOrQuery::new(content).term("apple").term("banana");
1099
1100        let bool_query = BooleanQuery::new()
1101            .should(TermQuery::text(content, "apple"))
1102            .should(TermQuery::text(content, "banana"));
1103
1104        let wand_results = index.search(&wand_query, 10).await.unwrap();
1105        let bool_results = index.search(&bool_query, 10).await.unwrap();
1106
1107        // Both should find the same documents
1108        assert_eq!(
1109            wand_results.hits.len(),
1110            bool_results.hits.len(),
1111            "WAND and Boolean should find same number of docs"
1112        );
1113
1114        let wand_docs: std::collections::HashSet<u32> =
1115            wand_results.hits.iter().map(|h| h.address.doc_id).collect();
1116        let bool_docs: std::collections::HashSet<u32> =
1117            bool_results.hits.iter().map(|h| h.address.doc_id).collect();
1118
1119        assert_eq!(
1120            wand_docs, bool_docs,
1121            "WAND and Boolean should find same documents"
1122        );
1123    }
1124
1125    #[tokio::test]
1126    async fn test_vector_index_threshold_switch() {
1127        use crate::dsl::{DenseVectorConfig, VectorIndexType};
1128
1129        // Create schema with dense vector field configured for IVF-RaBitQ
1130        let mut schema_builder = SchemaBuilder::default();
1131        let title = schema_builder.add_text_field("title", true, true);
1132        let embedding = schema_builder.add_dense_vector_field_with_config(
1133            "embedding",
1134            true, // indexed
1135            true, // stored
1136            DenseVectorConfig {
1137                dim: 8,
1138                index_type: VectorIndexType::IvfRaBitQ,
1139                store_raw: true,
1140                num_clusters: Some(4), // Small for test
1141                nprobe: 2,
1142                mrl_dim: None,
1143                build_threshold: Some(50), // Build when we have 50+ vectors
1144            },
1145        );
1146        let schema = schema_builder.build();
1147
1148        let dir = RamDirectory::new();
1149        let config = IndexConfig::default();
1150
1151        // Phase 1: Add vectors below threshold (should use Flat index)
1152        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1153            .await
1154            .unwrap();
1155
1156        // Add 30 documents (below threshold of 50)
1157        for i in 0..30 {
1158            let mut doc = Document::new();
1159            doc.add_text(title, format!("Document {}", i));
1160            // Simple embedding: [i, i, i, i, i, i, i, i] normalized
1161            let vec: Vec<f32> = (0..8).map(|_| (i as f32) / 30.0).collect();
1162            doc.add_dense_vector(embedding, vec);
1163            writer.add_document(doc).await.unwrap();
1164        }
1165        writer.commit().await.unwrap();
1166
1167        // Open index and verify it's using Flat (not built yet)
1168        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1169        assert!(
1170            index.trained_centroids.is_empty(),
1171            "Should not have trained centroids below threshold"
1172        );
1173
1174        // Search should work with Flat index
1175        let query_vec: Vec<f32> = vec![0.5; 8];
1176        let segments = index.segment_readers();
1177        assert!(!segments.is_empty());
1178
1179        let results = segments[0]
1180            .search_dense_vector(
1181                embedding,
1182                &query_vec,
1183                5,
1184                1,
1185                crate::query::MultiValueCombiner::Max,
1186            )
1187            .unwrap();
1188        assert!(!results.is_empty(), "Flat search should return results");
1189
1190        // Phase 2: Add more vectors to cross threshold
1191        let writer = IndexWriter::open(dir.clone(), config.clone())
1192            .await
1193            .unwrap();
1194
1195        // Add 30 more documents (total 60, above threshold of 50)
1196        for i in 30..60 {
1197            let mut doc = Document::new();
1198            doc.add_text(title, format!("Document {}", i));
1199            let vec: Vec<f32> = (0..8).map(|_| (i as f32) / 60.0).collect();
1200            doc.add_dense_vector(embedding, vec);
1201            writer.add_document(doc).await.unwrap();
1202        }
1203        // Commit auto-triggers vector index build when threshold is crossed
1204        writer.commit().await.unwrap();
1205
1206        // Verify centroids were trained (auto-triggered)
1207        assert!(
1208            writer.is_vector_index_built(embedding).await,
1209            "Vector index should be built after crossing threshold"
1210        );
1211
1212        // Reopen index and verify trained structures are loaded
1213        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1214        assert!(
1215            index.trained_centroids.contains_key(&embedding.0),
1216            "Should have loaded trained centroids for embedding field"
1217        );
1218
1219        // Search should still work
1220        let segments = index.segment_readers();
1221        let results = segments[0]
1222            .search_dense_vector(
1223                embedding,
1224                &query_vec,
1225                5,
1226                1,
1227                crate::query::MultiValueCombiner::Max,
1228            )
1229            .unwrap();
1230        assert!(
1231            !results.is_empty(),
1232            "Search should return results after build"
1233        );
1234
1235        // Phase 3: Verify calling build_vector_index again is a no-op
1236        let writer = IndexWriter::open(dir.clone(), config.clone())
1237            .await
1238            .unwrap();
1239        writer.build_vector_index().await.unwrap(); // Should skip training
1240
1241        // Still built
1242        assert!(writer.is_vector_index_built(embedding).await);
1243    }
1244}