Skip to main content

hermes_core/index/
mod.rs

1//! Index - multi-segment async search index
2//!
3//! Components:
4//! - Index: main entry point for searching
5//! - IndexWriter: for adding documents and committing segments (native only)
6//! - Supports multiple segments with merge
7
8use std::path::Path;
9use std::sync::Arc;
10
11use parking_lot::RwLock;
12
13use rustc_hash::FxHashMap;
14
15use crate::DocId;
16use crate::directories::{Directory, SliceCachingDirectory};
17use crate::dsl::{Document, Field, Schema};
18use crate::error::{Error, Result};
19use crate::segment::{SegmentId, SegmentReader};
20use crate::structures::BlockPostingList;
21use crate::structures::{CoarseCentroids, PQCodebook};
22
23#[cfg(feature = "native")]
24mod vector_builder;
25#[cfg(feature = "native")]
26mod writer;
27#[cfg(feature = "native")]
28pub use writer::IndexWriter;
29
30mod metadata;
31pub use metadata::{FieldVectorMeta, INDEX_META_FILENAME, IndexMetadata, VectorIndexState};
32
33#[cfg(feature = "native")]
34mod helpers;
35#[cfg(feature = "native")]
36pub use helpers::{
37    IndexingStats, SchemaConfig, SchemaFieldConfig, create_index_at_path, create_index_from_sdl,
38    index_documents_from_reader, index_json_document, parse_schema,
39};
40
41/// Default file name for the slice cache
42pub const SLICE_CACHE_FILENAME: &str = "index.slicecache";
43
44/// Index configuration
45#[derive(Debug, Clone)]
46pub struct IndexConfig {
47    /// Number of threads for CPU-intensive tasks (search parallelism)
48    pub num_threads: usize,
49    /// Number of parallel segment builders (documents distributed round-robin)
50    pub num_indexing_threads: usize,
51    /// Number of threads for parallel block compression within each segment
52    pub num_compression_threads: usize,
53    /// Block cache size for term dictionary per segment
54    pub term_cache_blocks: usize,
55    /// Block cache size for document store per segment
56    pub store_cache_blocks: usize,
57    /// Max memory (bytes) across all builders before auto-commit (global limit)
58    pub max_indexing_memory_bytes: usize,
59    /// Merge policy for background segment merging
60    pub merge_policy: Box<dyn crate::merge::MergePolicy>,
61    /// Index optimization mode (adaptive, size-optimized, performance-optimized)
62    pub optimization: crate::structures::IndexOptimization,
63}
64
65impl Default for IndexConfig {
66    fn default() -> Self {
67        #[cfg(feature = "native")]
68        let cpus = num_cpus::get().max(1);
69        #[cfg(not(feature = "native"))]
70        let cpus = 1;
71
72        Self {
73            num_threads: cpus,
74            num_indexing_threads: 1,
75            num_compression_threads: cpus,
76            term_cache_blocks: 256,
77            store_cache_blocks: 32,
78            max_indexing_memory_bytes: 2 * 1024 * 1024 * 1024, // 256 MB default
79            merge_policy: Box::new(crate::merge::TieredMergePolicy::default()),
80            optimization: crate::structures::IndexOptimization::default(),
81        }
82    }
83}
84
85/// Multi-segment async Index
86///
87/// The main entry point for searching. Manages multiple segments
88/// and provides unified search across all of them.
89pub struct Index<D: Directory> {
90    directory: Arc<D>,
91    schema: Arc<Schema>,
92    config: IndexConfig,
93    segments: RwLock<Vec<Arc<SegmentReader>>>,
94    default_fields: Vec<crate::Field>,
95    tokenizers: Arc<crate::tokenizer::TokenizerRegistry>,
96    /// Cached global statistics for cross-segment IDF computation
97    global_stats: crate::query::GlobalStatsCache,
98    /// Index-level trained centroids per field (loaded from metadata)
99    #[allow(dead_code)] // Used in native builds for clone_for_builder
100    trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
101    /// Index-level trained PQ codebooks per field (for ScaNN)
102    #[allow(dead_code)] // Used in native builds for clone_for_builder
103    trained_codebooks: FxHashMap<u32, Arc<PQCodebook>>,
104    #[cfg(feature = "native")]
105    thread_pool: Arc<rayon::ThreadPool>,
106}
107
108impl<D: Directory> Index<D> {
109    /// Open an existing index from a directory
110    pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
111        let directory = Arc::new(directory);
112
113        // Read schema
114        let schema_slice = directory.open_read(Path::new("schema.json")).await?;
115        let schema_bytes = schema_slice.read_bytes().await?;
116        let schema: Schema = serde_json::from_slice(schema_bytes.as_slice())
117            .map_err(|e| Error::Serialization(e.to_string()))?;
118        let schema = Arc::new(schema);
119
120        // Load metadata and trained structures
121        let (segments, trained_centroids, trained_codebooks) =
122            Self::load_segments_and_trained(&directory, &schema, &config).await?;
123
124        #[cfg(feature = "native")]
125        let thread_pool = {
126            let pool = rayon::ThreadPoolBuilder::new()
127                .num_threads(config.num_threads)
128                .build()
129                .map_err(|e| Error::Io(std::io::Error::other(e)))?;
130            Arc::new(pool)
131        };
132
133        // Use schema's default_fields if specified, otherwise fall back to all indexed text fields
134        let default_fields: Vec<crate::Field> = if !schema.default_fields().is_empty() {
135            schema.default_fields().to_vec()
136        } else {
137            schema
138                .fields()
139                .filter(|(_, entry)| {
140                    entry.indexed && entry.field_type == crate::dsl::FieldType::Text
141                })
142                .map(|(field, _)| field)
143                .collect()
144        };
145
146        Ok(Self {
147            directory,
148            schema,
149            config,
150            segments: RwLock::new(segments),
151            default_fields,
152            tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
153            global_stats: crate::query::GlobalStatsCache::new(),
154            trained_centroids,
155            trained_codebooks,
156            #[cfg(feature = "native")]
157            thread_pool,
158        })
159    }
160
161    /// Load segments and trained structures from metadata
162    async fn load_segments_and_trained(
163        directory: &Arc<D>,
164        schema: &Arc<Schema>,
165        config: &IndexConfig,
166    ) -> Result<(
167        Vec<Arc<SegmentReader>>,
168        FxHashMap<u32, Arc<CoarseCentroids>>,
169        FxHashMap<u32, Arc<PQCodebook>>,
170    )> {
171        // Load metadata
172        let meta = Self::load_metadata(directory).await?;
173
174        // Load trained centroids and codebooks
175        let (trained_centroids, trained_codebooks) =
176            meta.load_trained_structures(directory.as_ref()).await;
177
178        // Load segments
179        let mut segments = Vec::new();
180        let mut doc_id_offset = 0u32;
181
182        for id_str in meta.segments {
183            let segment_id = SegmentId::from_hex(&id_str)
184                .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
185            let reader = SegmentReader::open(
186                directory.as_ref(),
187                segment_id,
188                Arc::clone(schema),
189                doc_id_offset,
190                config.term_cache_blocks,
191            )
192            .await?;
193
194            doc_id_offset += reader.meta().num_docs;
195            segments.push(Arc::new(reader));
196        }
197
198        Ok((segments, trained_centroids, trained_codebooks))
199    }
200
201    /// Load metadata from metadata.json
202    async fn load_metadata(directory: &Arc<D>) -> Result<IndexMetadata> {
203        let meta_path = Path::new(INDEX_META_FILENAME);
204        if directory.exists(meta_path).await.unwrap_or(false) {
205            let slice = directory.open_read(meta_path).await?;
206            let bytes = slice.read_bytes().await?;
207            let meta: IndexMetadata = serde_json::from_slice(bytes.as_slice())
208                .map_err(|e| Error::Serialization(e.to_string()))?;
209            return Ok(meta);
210        }
211        Ok(IndexMetadata::new())
212    }
213
214    /// Get the schema
215    pub fn schema(&self) -> &Schema {
216        &self.schema
217    }
218
219    /// Get a reference to the underlying directory
220    pub fn directory(&self) -> &D {
221        &self.directory
222    }
223
224    /// Total number of documents across all segments
225    pub fn num_docs(&self) -> u32 {
226        self.segments.read().iter().map(|s| s.num_docs()).sum()
227    }
228
229    /// Get a document by global doc_id (async)
230    pub async fn doc(&self, doc_id: DocId) -> Result<Option<Document>> {
231        let segments = self.segments.read().clone();
232
233        let mut offset = 0u32;
234        for segment in segments.iter() {
235            let segment_docs = segment.meta().num_docs;
236            if doc_id < offset + segment_docs {
237                let local_doc_id = doc_id - offset;
238                return segment.doc(local_doc_id).await;
239            }
240            offset += segment_docs;
241        }
242
243        Ok(None)
244    }
245
246    /// Get posting lists for a term across all segments (async)
247    pub async fn get_postings(
248        &self,
249        field: Field,
250        term: &[u8],
251    ) -> Result<Vec<(Arc<SegmentReader>, BlockPostingList)>> {
252        let segments = self.segments.read().clone();
253        let mut results = Vec::new();
254
255        for segment in segments.iter() {
256            if let Some(postings) = segment.get_postings(field, term).await? {
257                results.push((Arc::clone(segment), postings));
258            }
259        }
260
261        Ok(results)
262    }
263
264    /// Execute CPU-intensive work on thread pool (native only)
265    #[cfg(feature = "native")]
266    pub async fn spawn_blocking<F, R>(&self, f: F) -> R
267    where
268        F: FnOnce() -> R + Send + 'static,
269        R: Send + 'static,
270    {
271        let (tx, rx) = tokio::sync::oneshot::channel();
272        self.thread_pool.spawn(move || {
273            let result = f();
274            let _ = tx.send(result);
275        });
276        rx.await.expect("Thread pool task panicked")
277    }
278
279    /// Get segment readers for query execution
280    pub fn segment_readers(&self) -> Vec<Arc<SegmentReader>> {
281        self.segments.read().clone()
282    }
283
284    /// Reload segments from directory (after new segments added or merges completed)
285    ///
286    /// This reads the current metadata.json and loads only segments that exist.
287    /// Safe to call at any time - will not reference deleted segments.
288    ///
289    /// Note: This only reloads segments, not trained structures (they are immutable once built)
290    pub async fn reload(&self) -> Result<()> {
291        // Load fresh metadata from disk
292        let meta = Self::load_metadata(&self.directory).await?;
293
294        // Load only segments that exist in current metadata
295        let mut new_segments = Vec::new();
296        let mut doc_id_offset = 0u32;
297
298        for id_str in meta.segments {
299            let segment_id = match SegmentId::from_hex(&id_str) {
300                Some(id) => id,
301                None => {
302                    log::warn!("Invalid segment ID in metadata: {}", id_str);
303                    continue;
304                }
305            };
306
307            // Try to open segment - skip if files don't exist (may have been deleted by merge)
308            match SegmentReader::open(
309                self.directory.as_ref(),
310                segment_id,
311                Arc::clone(&self.schema),
312                doc_id_offset,
313                self.config.term_cache_blocks,
314            )
315            .await
316            {
317                Ok(reader) => {
318                    doc_id_offset += reader.meta().num_docs;
319                    new_segments.push(Arc::new(reader));
320                }
321                Err(e) => {
322                    // Segment files may have been deleted by concurrent merge - skip it
323                    log::warn!(
324                        "Could not open segment {}: {:?} (may have been merged)",
325                        id_str,
326                        e
327                    );
328                }
329            }
330        }
331
332        *self.segments.write() = new_segments;
333        // Invalidate global stats cache since segments changed
334        self.global_stats.invalidate();
335        Ok(())
336    }
337
338    /// Check if segments need reloading (metadata changed since last load)
339    pub async fn needs_reload(&self) -> Result<bool> {
340        let meta = Self::load_metadata(&self.directory).await?;
341        let current_segments = self.segments.read();
342
343        // Compare segment count and IDs
344        if meta.segments.len() != current_segments.len() {
345            return Ok(true);
346        }
347
348        for (meta_id, reader) in meta.segments.iter().zip(current_segments.iter()) {
349            let reader_id = SegmentId::from_u128(reader.meta().id).to_hex();
350            if meta_id != &reader_id {
351                return Ok(true);
352            }
353        }
354
355        Ok(false)
356    }
357
358    /// Get global statistics for cross-segment IDF computation (sync, basic stats only)
359    ///
360    /// Returns cached stats if available. For full stats including term frequencies,
361    /// call `build_global_stats().await` first.
362    ///
363    /// This sync version only includes:
364    /// - Total docs
365    /// - Sparse vector dimension document frequencies
366    /// - Average field lengths
367    pub fn global_stats(&self) -> Option<Arc<crate::query::GlobalStats>> {
368        self.global_stats.get()
369    }
370
371    /// Build and cache global statistics (async, includes term frequencies)
372    ///
373    /// This iterates term dictionaries across all segments to compute
374    /// accurate cross-segment IDF values for full-text queries.
375    ///
376    /// Call this once after opening the index or after reload().
377    pub async fn build_global_stats(&self) -> Result<Arc<crate::query::GlobalStats>> {
378        // Return cached if available
379        if let Some(stats) = self.global_stats.get() {
380            return Ok(stats);
381        }
382
383        let segments = self.segments.read().clone();
384        let schema = &self.schema;
385        let mut builder = crate::query::GlobalStatsBuilder::new();
386
387        // Track field length sums for computing global avg
388        let mut field_len_sums: rustc_hash::FxHashMap<u32, (u64, u64)> =
389            rustc_hash::FxHashMap::default();
390
391        for segment in &segments {
392            let num_docs = segment.num_docs() as u64;
393            builder.total_docs += num_docs;
394
395            // Aggregate sparse vector statistics
396            for (&field_id, sparse_index) in segment.sparse_indexes() {
397                for (dim_id, posting_list) in sparse_index.postings.iter().enumerate() {
398                    if let Some(pl) = posting_list {
399                        builder.add_sparse_df(
400                            crate::dsl::Field(field_id),
401                            dim_id as u32,
402                            pl.doc_count() as u64,
403                        );
404                    }
405                }
406            }
407
408            // Aggregate text field average lengths
409            for (field, entry) in schema.fields() {
410                if entry.indexed && entry.field_type == crate::dsl::FieldType::Text {
411                    let avg_len = segment.avg_field_len(field);
412                    let (sum, count) = field_len_sums.entry(field.0).or_insert((0, 0));
413                    *sum += (avg_len * num_docs as f32) as u64;
414                    *count += num_docs;
415                }
416            }
417
418            // Iterate term dictionary to get term document frequencies
419            for (field, term, doc_freq) in segment.all_terms_with_stats().await? {
420                builder.add_text_df(field, term, doc_freq as u64);
421            }
422        }
423
424        // Set global average field lengths
425        for (field_id, (sum, count)) in field_len_sums {
426            if count > 0 {
427                let global_avg = sum as f32 / count as f32;
428                builder.set_avg_field_len(crate::dsl::Field(field_id), global_avg);
429            }
430        }
431
432        let generation = self.global_stats.generation();
433        let stats = builder.build(generation);
434        self.global_stats.set_stats(stats);
435
436        Ok(self.global_stats.get().unwrap())
437    }
438
439    /// Search and return results with document addresses (no document content)
440    ///
441    /// This is the primary search method. Use `get_document` to fetch document content.
442    pub async fn search(
443        &self,
444        query: &dyn crate::query::Query,
445        limit: usize,
446    ) -> Result<crate::query::SearchResponse> {
447        self.search_offset(query, limit, 0).await
448    }
449
450    /// Search with offset for pagination
451    pub async fn search_offset(
452        &self,
453        query: &dyn crate::query::Query,
454        limit: usize,
455        offset: usize,
456    ) -> Result<crate::query::SearchResponse> {
457        self.search_internal(query, limit, offset, false).await
458    }
459
460    /// Search with matched field ordinals (for multi-valued fields with position tracking)
461    ///
462    /// Returns which array elements matched for each field with position tracking enabled.
463    pub async fn search_with_matched_fields(
464        &self,
465        query: &dyn crate::query::Query,
466        limit: usize,
467    ) -> Result<crate::query::SearchResponse> {
468        self.search_internal(query, limit, 0, true).await
469    }
470
471    async fn search_internal(
472        &self,
473        query: &dyn crate::query::Query,
474        limit: usize,
475        offset: usize,
476        collect_positions: bool,
477    ) -> Result<crate::query::SearchResponse> {
478        let segments = self.segments.read().clone();
479        let mut all_results: Vec<(u128, crate::query::SearchResult)> = Vec::new();
480
481        // Fetch enough results to cover offset + limit
482        let fetch_limit = offset + limit;
483        for segment in &segments {
484            let segment_id = segment.meta().id;
485            let results = if collect_positions {
486                crate::query::search_segment_with_positions(segment.as_ref(), query, fetch_limit)
487                    .await?
488            } else {
489                crate::query::search_segment(segment.as_ref(), query, fetch_limit).await?
490            };
491            for result in results {
492                all_results.push((segment_id, result));
493            }
494        }
495
496        // Sort by score descending
497        all_results.sort_by(|a, b| {
498            b.1.score
499                .partial_cmp(&a.1.score)
500                .unwrap_or(std::cmp::Ordering::Equal)
501        });
502
503        // Total hits before pagination
504        let total_hits = all_results.len() as u32;
505
506        // Apply offset and limit
507        let hits: Vec<crate::query::SearchHit> = all_results
508            .into_iter()
509            .skip(offset)
510            .take(limit)
511            .map(|(segment_id, result)| crate::query::SearchHit {
512                address: crate::query::DocAddress::new(segment_id, result.doc_id),
513                score: result.score,
514                matched_fields: result.extract_ordinals(),
515            })
516            .collect();
517
518        Ok(crate::query::SearchResponse { hits, total_hits })
519    }
520
521    /// Get a document by its unique address (segment_id + local doc_id)
522    pub async fn get_document(
523        &self,
524        address: &crate::query::DocAddress,
525    ) -> Result<Option<Document>> {
526        let segment_id = address
527            .segment_id_u128()
528            .ok_or_else(|| Error::Query(format!("Invalid segment ID: {}", address.segment_id)))?;
529
530        let segments = self.segments.read().clone();
531        for segment in &segments {
532            if segment.meta().id == segment_id {
533                return segment.doc(address.doc_id).await;
534            }
535        }
536
537        Ok(None)
538    }
539
540    /// Get the default fields for this index
541    pub fn default_fields(&self) -> &[crate::Field] {
542        &self.default_fields
543    }
544
545    /// Set the default fields for query parsing
546    pub fn set_default_fields(&mut self, fields: Vec<crate::Field>) {
547        self.default_fields = fields;
548    }
549
550    /// Get the tokenizer registry
551    pub fn tokenizers(&self) -> &Arc<crate::tokenizer::TokenizerRegistry> {
552        &self.tokenizers
553    }
554
555    /// Create a query parser for this index
556    ///
557    /// If the schema contains query router rules, they will be used to route
558    /// queries to specific fields based on regex patterns.
559    pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
560        // Check if schema has query routers
561        let query_routers = self.schema.query_routers();
562        if !query_routers.is_empty() {
563            // Try to create a router from the schema's rules
564            if let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers) {
565                return crate::dsl::QueryLanguageParser::with_router(
566                    Arc::clone(&self.schema),
567                    self.default_fields.clone(),
568                    Arc::clone(&self.tokenizers),
569                    router,
570                );
571            }
572        }
573
574        // Fall back to parser without router
575        crate::dsl::QueryLanguageParser::new(
576            Arc::clone(&self.schema),
577            self.default_fields.clone(),
578            Arc::clone(&self.tokenizers),
579        )
580    }
581
582    /// Parse and search using a query string
583    ///
584    /// Accepts both query language syntax (field:term, AND, OR, NOT, grouping)
585    /// and simple text (tokenized and searched across default fields).
586    /// Returns document addresses (segment_id + doc_id) without document content.
587    pub async fn query(
588        &self,
589        query_str: &str,
590        limit: usize,
591    ) -> Result<crate::query::SearchResponse> {
592        self.query_offset(query_str, limit, 0).await
593    }
594
595    /// Query with offset for pagination
596    pub async fn query_offset(
597        &self,
598        query_str: &str,
599        limit: usize,
600        offset: usize,
601    ) -> Result<crate::query::SearchResponse> {
602        let parser = self.query_parser();
603        let query = parser.parse(query_str).map_err(Error::Query)?;
604        self.search_offset(query.as_ref(), limit, offset).await
605    }
606}
607
608/// Methods for opening index with slice caching
609impl<D: Directory> Index<SliceCachingDirectory<D>> {
610    /// Open an index with slice caching, automatically loading the cache file if present
611    ///
612    /// This wraps the directory in a SliceCachingDirectory and attempts to load
613    /// any existing slice cache file to prefill the cache with hot data.
614    pub async fn open_with_cache(
615        directory: D,
616        config: IndexConfig,
617        cache_max_bytes: usize,
618    ) -> Result<Self> {
619        let caching_dir = SliceCachingDirectory::new(directory, cache_max_bytes);
620
621        // Try to load existing slice cache
622        let cache_path = Path::new(SLICE_CACHE_FILENAME);
623        if let Ok(true) = caching_dir.inner().exists(cache_path).await
624            && let Ok(slice) = caching_dir.inner().open_read(cache_path).await
625            && let Ok(bytes) = slice.read_bytes().await
626        {
627            let _ = caching_dir.deserialize(bytes.as_slice());
628        }
629
630        Self::open(caching_dir, config).await
631    }
632
633    /// Serialize the current slice cache to the index directory
634    ///
635    /// This saves all cached slices to a single file that can be loaded
636    /// on subsequent index opens for faster startup.
637    #[cfg(feature = "native")]
638    pub async fn save_slice_cache(&self) -> Result<()>
639    where
640        D: crate::directories::DirectoryWriter,
641    {
642        let cache_data = self.directory.serialize();
643        let cache_path = Path::new(SLICE_CACHE_FILENAME);
644        self.directory
645            .inner()
646            .write(cache_path, &cache_data)
647            .await?;
648        Ok(())
649    }
650
651    /// Get slice cache statistics
652    pub fn slice_cache_stats(&self) -> crate::directories::SliceCacheStats {
653        self.directory.stats()
654    }
655}
656
657/// Warm up the slice cache by opening an index and performing typical read operations
658///
659/// This function opens an index using a SliceCachingDirectory, performs operations
660/// that would typically be done during search (reading term dictionaries, posting lists),
661/// and then serializes the cache to a file for future use.
662///
663/// The resulting cache file contains all the "hot" data that was read during warmup,
664/// allowing subsequent index opens to prefill the cache and avoid cold-start latency.
665#[cfg(feature = "native")]
666pub async fn warmup_and_save_slice_cache<D: crate::directories::DirectoryWriter>(
667    directory: D,
668    config: IndexConfig,
669    cache_max_bytes: usize,
670) -> Result<()> {
671    let caching_dir = SliceCachingDirectory::new(directory, cache_max_bytes);
672    let index = Index::open(caching_dir, config).await?;
673
674    // Warm up by loading segment metadata and term dictionaries
675    // The SegmentReader::open already reads essential metadata
676    // Additional warmup can be done by iterating terms or doing sample queries
677
678    // Save the cache
679    index.save_slice_cache().await?;
680
681    Ok(())
682}
683
684#[cfg(feature = "native")]
685impl<D: Directory> Clone for Index<D> {
686    fn clone(&self) -> Self {
687        Self {
688            directory: Arc::clone(&self.directory),
689            schema: Arc::clone(&self.schema),
690            config: self.config.clone(),
691            segments: RwLock::new(self.segments.read().clone()),
692            default_fields: self.default_fields.clone(),
693            tokenizers: Arc::clone(&self.tokenizers),
694            global_stats: crate::query::GlobalStatsCache::new(),
695            trained_centroids: self.trained_centroids.clone(),
696            trained_codebooks: self.trained_codebooks.clone(),
697            thread_pool: Arc::clone(&self.thread_pool),
698        }
699    }
700}
701
702#[cfg(test)]
703mod tests {
704    use super::*;
705    use crate::directories::RamDirectory;
706    use crate::dsl::SchemaBuilder;
707
708    #[tokio::test]
709    async fn test_index_create_and_search() {
710        let mut schema_builder = SchemaBuilder::default();
711        let title = schema_builder.add_text_field("title", true, true);
712        let body = schema_builder.add_text_field("body", true, true);
713        let schema = schema_builder.build();
714
715        let dir = RamDirectory::new();
716        let config = IndexConfig::default();
717
718        // Create index and add documents
719        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
720            .await
721            .unwrap();
722
723        let mut doc1 = Document::new();
724        doc1.add_text(title, "Hello World");
725        doc1.add_text(body, "This is the first document");
726        writer.add_document(doc1).unwrap();
727
728        let mut doc2 = Document::new();
729        doc2.add_text(title, "Goodbye World");
730        doc2.add_text(body, "This is the second document");
731        writer.add_document(doc2).unwrap();
732
733        writer.commit().await.unwrap();
734
735        // Open for reading
736        let index = Index::open(dir, config).await.unwrap();
737        assert_eq!(index.num_docs(), 2);
738
739        // Check postings
740        let postings = index.get_postings(title, b"world").await.unwrap();
741        assert_eq!(postings.len(), 1); // One segment
742        assert_eq!(postings[0].1.doc_count(), 2); // Two docs with "world"
743
744        // Retrieve document
745        let doc = index.doc(0).await.unwrap().unwrap();
746        assert_eq!(doc.get_first(title).unwrap().as_text(), Some("Hello World"));
747    }
748
749    #[tokio::test]
750    async fn test_multiple_segments() {
751        let mut schema_builder = SchemaBuilder::default();
752        let title = schema_builder.add_text_field("title", true, true);
753        let schema = schema_builder.build();
754
755        let dir = RamDirectory::new();
756        let config = IndexConfig {
757            max_indexing_memory_bytes: 1024, // Very small to trigger frequent flushes
758            ..Default::default()
759        };
760
761        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
762            .await
763            .unwrap();
764
765        // Add documents in batches to create multiple segments
766        for batch in 0..3 {
767            for i in 0..5 {
768                let mut doc = Document::new();
769                doc.add_text(title, format!("Document {} batch {}", i, batch));
770                writer.add_document(doc).unwrap();
771            }
772            writer.commit().await.unwrap();
773        }
774
775        // Open and check
776        let index = Index::open(dir, config).await.unwrap();
777        assert_eq!(index.num_docs(), 15);
778        // With queue-based indexing, exact segment count varies
779        assert!(
780            index.segment_readers().len() >= 2,
781            "Expected multiple segments"
782        );
783    }
784
785    #[tokio::test]
786    async fn test_segment_merge() {
787        let mut schema_builder = SchemaBuilder::default();
788        let title = schema_builder.add_text_field("title", true, true);
789        let schema = schema_builder.build();
790
791        let dir = RamDirectory::new();
792        let config = IndexConfig {
793            max_indexing_memory_bytes: 512, // Very small to trigger frequent flushes
794            ..Default::default()
795        };
796
797        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
798            .await
799            .unwrap();
800
801        // Create multiple segments by flushing between batches
802        for batch in 0..3 {
803            for i in 0..3 {
804                let mut doc = Document::new();
805                doc.add_text(title, format!("Document {} batch {}", i, batch));
806                writer.add_document(doc).unwrap();
807            }
808            writer.flush().await.unwrap();
809        }
810        writer.commit().await.unwrap();
811
812        // Should have multiple segments (at least 2, one per flush with docs)
813        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
814        assert!(
815            index.segment_readers().len() >= 2,
816            "Expected multiple segments"
817        );
818
819        // Force merge
820        let writer = IndexWriter::open(dir.clone(), config.clone())
821            .await
822            .unwrap();
823        writer.force_merge().await.unwrap();
824
825        // Should have 1 segment now
826        let index = Index::open(dir, config).await.unwrap();
827        assert_eq!(index.segment_readers().len(), 1);
828        assert_eq!(index.num_docs(), 9);
829
830        // Verify all documents accessible (order may vary with queue-based indexing)
831        let mut found_docs = 0;
832        for i in 0..9 {
833            if index.doc(i).await.unwrap().is_some() {
834                found_docs += 1;
835            }
836        }
837        assert_eq!(found_docs, 9);
838    }
839
840    #[tokio::test]
841    async fn test_match_query() {
842        let mut schema_builder = SchemaBuilder::default();
843        let title = schema_builder.add_text_field("title", true, true);
844        let body = schema_builder.add_text_field("body", true, true);
845        let schema = schema_builder.build();
846
847        let dir = RamDirectory::new();
848        let config = IndexConfig::default();
849
850        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
851            .await
852            .unwrap();
853
854        let mut doc1 = Document::new();
855        doc1.add_text(title, "rust programming");
856        doc1.add_text(body, "Learn rust language");
857        writer.add_document(doc1).unwrap();
858
859        let mut doc2 = Document::new();
860        doc2.add_text(title, "python programming");
861        doc2.add_text(body, "Learn python language");
862        writer.add_document(doc2).unwrap();
863
864        writer.commit().await.unwrap();
865
866        let index = Index::open(dir, config).await.unwrap();
867
868        // Test match query with multiple default fields
869        let results = index.query("rust", 10).await.unwrap();
870        assert_eq!(results.hits.len(), 1);
871
872        // Test match query with multiple tokens
873        let results = index.query("rust programming", 10).await.unwrap();
874        assert!(!results.hits.is_empty());
875
876        // Verify hit has address (segment_id + doc_id)
877        let hit = &results.hits[0];
878        assert!(!hit.address.segment_id.is_empty(), "Should have segment_id");
879
880        // Verify document retrieval by address
881        let doc = index.get_document(&hit.address).await.unwrap().unwrap();
882        assert!(
883            !doc.field_values().is_empty(),
884            "Doc should have field values"
885        );
886
887        // Also verify doc retrieval directly by global doc_id
888        let doc = index.doc(0).await.unwrap().unwrap();
889        assert!(
890            !doc.field_values().is_empty(),
891            "Doc should have field values"
892        );
893    }
894
895    #[tokio::test]
896    async fn test_slice_cache_warmup_and_load() {
897        use crate::directories::SliceCachingDirectory;
898
899        let mut schema_builder = SchemaBuilder::default();
900        let title = schema_builder.add_text_field("title", true, true);
901        let body = schema_builder.add_text_field("body", true, true);
902        let schema = schema_builder.build();
903
904        let dir = RamDirectory::new();
905        let config = IndexConfig::default();
906
907        // Create index with some documents
908        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
909            .await
910            .unwrap();
911
912        for i in 0..10 {
913            let mut doc = Document::new();
914            doc.add_text(title, format!("Document {} about rust", i));
915            doc.add_text(body, format!("This is body text number {}", i));
916            writer.add_document(doc).unwrap();
917        }
918        writer.commit().await.unwrap();
919
920        // Open with slice caching and perform some operations to warm up cache
921        let caching_dir = SliceCachingDirectory::new(dir.clone(), 1024 * 1024);
922        let index = Index::open(caching_dir, config.clone()).await.unwrap();
923
924        // Perform a search to warm up the cache
925        let results = index.query("rust", 10).await.unwrap();
926        assert!(!results.hits.is_empty());
927
928        // Check cache stats - should have cached some data
929        let stats = index.slice_cache_stats();
930        assert!(stats.total_bytes > 0, "Cache should have data after search");
931
932        // Save the cache
933        index.save_slice_cache().await.unwrap();
934
935        // Verify cache file was written
936        assert!(dir.exists(Path::new(SLICE_CACHE_FILENAME)).await.unwrap());
937
938        // Now open with cache loading
939        let index2 = Index::open_with_cache(dir.clone(), config.clone(), 1024 * 1024)
940            .await
941            .unwrap();
942
943        // Cache should be prefilled
944        let stats2 = index2.slice_cache_stats();
945        assert!(
946            stats2.total_bytes > 0,
947            "Cache should be prefilled from file"
948        );
949
950        // Search should still work
951        let results2 = index2.query("rust", 10).await.unwrap();
952        assert_eq!(results.hits.len(), results2.hits.len());
953    }
954
955    #[tokio::test]
956    async fn test_multivalue_field_indexing_and_search() {
957        let mut schema_builder = SchemaBuilder::default();
958        let uris = schema_builder.add_text_field("uris", true, true);
959        let title = schema_builder.add_text_field("title", true, true);
960        let schema = schema_builder.build();
961
962        let dir = RamDirectory::new();
963        let config = IndexConfig::default();
964
965        // Create index and add document with multi-value field
966        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
967            .await
968            .unwrap();
969
970        let mut doc = Document::new();
971        doc.add_text(uris, "one");
972        doc.add_text(uris, "two");
973        doc.add_text(title, "Test Document");
974        writer.add_document(doc).unwrap();
975
976        // Add another document with different uris
977        let mut doc2 = Document::new();
978        doc2.add_text(uris, "three");
979        doc2.add_text(title, "Another Document");
980        writer.add_document(doc2).unwrap();
981
982        writer.commit().await.unwrap();
983
984        // Open for reading
985        let index = Index::open(dir, config).await.unwrap();
986        assert_eq!(index.num_docs(), 2);
987
988        // Verify document retrieval preserves all values
989        let doc = index.doc(0).await.unwrap().unwrap();
990        let all_uris: Vec<_> = doc.get_all(uris).collect();
991        assert_eq!(all_uris.len(), 2, "Should have 2 uris values");
992        assert_eq!(all_uris[0].as_text(), Some("one"));
993        assert_eq!(all_uris[1].as_text(), Some("two"));
994
995        // Verify to_json returns array for multi-value field
996        let json = doc.to_json(index.schema());
997        let uris_json = json.get("uris").unwrap();
998        assert!(uris_json.is_array(), "Multi-value field should be an array");
999        let uris_arr = uris_json.as_array().unwrap();
1000        assert_eq!(uris_arr.len(), 2);
1001        assert_eq!(uris_arr[0].as_str(), Some("one"));
1002        assert_eq!(uris_arr[1].as_str(), Some("two"));
1003
1004        // Verify both values are searchable
1005        let results = index.query("uris:one", 10).await.unwrap();
1006        assert_eq!(results.hits.len(), 1, "Should find doc with 'one'");
1007        assert_eq!(results.hits[0].address.doc_id, 0);
1008
1009        let results = index.query("uris:two", 10).await.unwrap();
1010        assert_eq!(results.hits.len(), 1, "Should find doc with 'two'");
1011        assert_eq!(results.hits[0].address.doc_id, 0);
1012
1013        let results = index.query("uris:three", 10).await.unwrap();
1014        assert_eq!(results.hits.len(), 1, "Should find doc with 'three'");
1015        assert_eq!(results.hits[0].address.doc_id, 1);
1016
1017        // Verify searching for non-existent value returns no results
1018        let results = index.query("uris:nonexistent", 10).await.unwrap();
1019        assert_eq!(results.hits.len(), 0, "Should not find non-existent value");
1020    }
1021
1022    /// Comprehensive test for WAND optimization in BooleanQuery OR queries
1023    ///
1024    /// This test verifies that:
1025    /// 1. BooleanQuery with multiple SHOULD term queries uses WAND automatically
1026    /// 2. Search results are correct regardless of WAND optimization
1027    /// 3. Scores are reasonable for matching documents
1028    #[tokio::test]
1029    async fn test_wand_optimization_for_or_queries() {
1030        use crate::query::{BooleanQuery, TermQuery};
1031
1032        let mut schema_builder = SchemaBuilder::default();
1033        let content = schema_builder.add_text_field("content", true, true);
1034        let schema = schema_builder.build();
1035
1036        let dir = RamDirectory::new();
1037        let config = IndexConfig::default();
1038
1039        // Create index with documents containing various terms
1040        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1041            .await
1042            .unwrap();
1043
1044        // Doc 0: contains "rust" and "programming"
1045        let mut doc = Document::new();
1046        doc.add_text(content, "rust programming language is fast");
1047        writer.add_document(doc).unwrap();
1048
1049        // Doc 1: contains "rust" only
1050        let mut doc = Document::new();
1051        doc.add_text(content, "rust is a systems language");
1052        writer.add_document(doc).unwrap();
1053
1054        // Doc 2: contains "programming" only
1055        let mut doc = Document::new();
1056        doc.add_text(content, "programming is fun");
1057        writer.add_document(doc).unwrap();
1058
1059        // Doc 3: contains "python" (neither rust nor programming)
1060        let mut doc = Document::new();
1061        doc.add_text(content, "python is easy to learn");
1062        writer.add_document(doc).unwrap();
1063
1064        // Doc 4: contains both "rust" and "programming" multiple times
1065        let mut doc = Document::new();
1066        doc.add_text(content, "rust rust programming programming systems");
1067        writer.add_document(doc).unwrap();
1068
1069        writer.commit().await.unwrap();
1070
1071        // Open for reading
1072        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1073
1074        // Test 1: Pure OR query with multiple terms (should use WAND automatically)
1075        let or_query = BooleanQuery::new()
1076            .should(TermQuery::text(content, "rust"))
1077            .should(TermQuery::text(content, "programming"));
1078
1079        let results = index.search(&or_query, 10).await.unwrap();
1080
1081        // Should find docs 0, 1, 2, 4 (all that contain "rust" OR "programming")
1082        assert_eq!(results.hits.len(), 4, "Should find exactly 4 documents");
1083
1084        let doc_ids: Vec<u32> = results.hits.iter().map(|h| h.address.doc_id).collect();
1085        assert!(doc_ids.contains(&0), "Should find doc 0");
1086        assert!(doc_ids.contains(&1), "Should find doc 1");
1087        assert!(doc_ids.contains(&2), "Should find doc 2");
1088        assert!(doc_ids.contains(&4), "Should find doc 4");
1089        assert!(
1090            !doc_ids.contains(&3),
1091            "Should NOT find doc 3 (only has 'python')"
1092        );
1093
1094        // Test 2: Single term query (should NOT use WAND, but still work)
1095        let single_query = BooleanQuery::new().should(TermQuery::text(content, "rust"));
1096
1097        let results = index.search(&single_query, 10).await.unwrap();
1098        assert_eq!(results.hits.len(), 3, "Should find 3 documents with 'rust'");
1099
1100        // Test 3: Query with MUST (should NOT use WAND)
1101        let must_query = BooleanQuery::new()
1102            .must(TermQuery::text(content, "rust"))
1103            .should(TermQuery::text(content, "programming"));
1104
1105        let results = index.search(&must_query, 10).await.unwrap();
1106        // Must have "rust", optionally "programming"
1107        assert_eq!(results.hits.len(), 3, "Should find 3 documents with 'rust'");
1108
1109        // Test 4: Query with MUST_NOT (should NOT use WAND)
1110        let must_not_query = BooleanQuery::new()
1111            .should(TermQuery::text(content, "rust"))
1112            .should(TermQuery::text(content, "programming"))
1113            .must_not(TermQuery::text(content, "systems"));
1114
1115        let results = index.search(&must_not_query, 10).await.unwrap();
1116        // Should exclude docs with "systems" (doc 1 and 4)
1117        let doc_ids: Vec<u32> = results.hits.iter().map(|h| h.address.doc_id).collect();
1118        assert!(
1119            !doc_ids.contains(&1),
1120            "Should NOT find doc 1 (has 'systems')"
1121        );
1122        assert!(
1123            !doc_ids.contains(&4),
1124            "Should NOT find doc 4 (has 'systems')"
1125        );
1126
1127        // Test 5: Verify top-k limit works correctly with WAND
1128        let or_query = BooleanQuery::new()
1129            .should(TermQuery::text(content, "rust"))
1130            .should(TermQuery::text(content, "programming"));
1131
1132        let results = index.search(&or_query, 2).await.unwrap();
1133        assert_eq!(results.hits.len(), 2, "Should return only top 2 results");
1134
1135        // Top results should be docs that match both terms (higher scores)
1136        // Doc 0 and 4 contain both "rust" and "programming"
1137    }
1138
1139    /// Test that WAND optimization produces same results as non-WAND for correctness
1140    #[tokio::test]
1141    async fn test_wand_results_match_standard_boolean() {
1142        use crate::query::{BooleanQuery, TermQuery, WandOrQuery};
1143
1144        let mut schema_builder = SchemaBuilder::default();
1145        let content = schema_builder.add_text_field("content", true, true);
1146        let schema = schema_builder.build();
1147
1148        let dir = RamDirectory::new();
1149        let config = IndexConfig::default();
1150
1151        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1152            .await
1153            .unwrap();
1154
1155        // Add several documents
1156        for i in 0..10 {
1157            let mut doc = Document::new();
1158            let text = match i % 4 {
1159                0 => "apple banana cherry",
1160                1 => "apple orange",
1161                2 => "banana grape",
1162                _ => "cherry date",
1163            };
1164            doc.add_text(content, text);
1165            writer.add_document(doc).unwrap();
1166        }
1167
1168        writer.commit().await.unwrap();
1169        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1170
1171        // Compare explicit WandOrQuery with auto-optimized BooleanQuery
1172        let wand_query = WandOrQuery::new(content).term("apple").term("banana");
1173
1174        let bool_query = BooleanQuery::new()
1175            .should(TermQuery::text(content, "apple"))
1176            .should(TermQuery::text(content, "banana"));
1177
1178        let wand_results = index.search(&wand_query, 10).await.unwrap();
1179        let bool_results = index.search(&bool_query, 10).await.unwrap();
1180
1181        // Both should find the same documents
1182        assert_eq!(
1183            wand_results.hits.len(),
1184            bool_results.hits.len(),
1185            "WAND and Boolean should find same number of docs"
1186        );
1187
1188        let wand_docs: std::collections::HashSet<u32> =
1189            wand_results.hits.iter().map(|h| h.address.doc_id).collect();
1190        let bool_docs: std::collections::HashSet<u32> =
1191            bool_results.hits.iter().map(|h| h.address.doc_id).collect();
1192
1193        assert_eq!(
1194            wand_docs, bool_docs,
1195            "WAND and Boolean should find same documents"
1196        );
1197    }
1198
1199    #[tokio::test]
1200    async fn test_vector_index_threshold_switch() {
1201        use crate::dsl::{DenseVectorConfig, VectorIndexType};
1202
1203        // Create schema with dense vector field configured for IVF-RaBitQ
1204        let mut schema_builder = SchemaBuilder::default();
1205        let title = schema_builder.add_text_field("title", true, true);
1206        let embedding = schema_builder.add_dense_vector_field_with_config(
1207            "embedding",
1208            true, // indexed
1209            true, // stored
1210            DenseVectorConfig {
1211                dim: 8,
1212                index_type: VectorIndexType::IvfRaBitQ,
1213                store_raw: true,
1214                num_clusters: Some(4), // Small for test
1215                nprobe: 2,
1216                mrl_dim: None,
1217                build_threshold: Some(50), // Build when we have 50+ vectors
1218            },
1219        );
1220        let schema = schema_builder.build();
1221
1222        let dir = RamDirectory::new();
1223        let config = IndexConfig::default();
1224
1225        // Phase 1: Add vectors below threshold (should use Flat index)
1226        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1227            .await
1228            .unwrap();
1229
1230        // Add 30 documents (below threshold of 50)
1231        for i in 0..30 {
1232            let mut doc = Document::new();
1233            doc.add_text(title, format!("Document {}", i));
1234            // Simple embedding: [i, i, i, i, i, i, i, i] normalized
1235            let vec: Vec<f32> = (0..8).map(|_| (i as f32) / 30.0).collect();
1236            doc.add_dense_vector(embedding, vec);
1237            writer.add_document(doc).unwrap();
1238        }
1239        writer.commit().await.unwrap();
1240
1241        // Open index and verify it's using Flat (not built yet)
1242        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1243        assert!(
1244            index.trained_centroids.is_empty(),
1245            "Should not have trained centroids below threshold"
1246        );
1247
1248        // Search should work with Flat index
1249        let query_vec: Vec<f32> = vec![0.5; 8];
1250        let segments = index.segment_readers();
1251        assert!(!segments.is_empty());
1252
1253        let results = segments[0]
1254            .search_dense_vector(
1255                embedding,
1256                &query_vec,
1257                5,
1258                1,
1259                crate::query::MultiValueCombiner::Max,
1260            )
1261            .unwrap();
1262        assert!(!results.is_empty(), "Flat search should return results");
1263
1264        // Phase 2: Add more vectors to cross threshold
1265        let writer = IndexWriter::open(dir.clone(), config.clone())
1266            .await
1267            .unwrap();
1268
1269        // Add 30 more documents (total 60, above threshold of 50)
1270        for i in 30..60 {
1271            let mut doc = Document::new();
1272            doc.add_text(title, format!("Document {}", i));
1273            let vec: Vec<f32> = (0..8).map(|_| (i as f32) / 60.0).collect();
1274            doc.add_dense_vector(embedding, vec);
1275            writer.add_document(doc).unwrap();
1276        }
1277        // Commit auto-triggers vector index build when threshold is crossed
1278        writer.commit().await.unwrap();
1279
1280        // Verify centroids were trained (auto-triggered)
1281        assert!(
1282            writer.is_vector_index_built(embedding).await,
1283            "Vector index should be built after crossing threshold"
1284        );
1285
1286        // Reopen index and verify trained structures are loaded
1287        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1288        assert!(
1289            index.trained_centroids.contains_key(&embedding.0),
1290            "Should have loaded trained centroids for embedding field"
1291        );
1292
1293        // Search should still work
1294        let segments = index.segment_readers();
1295        let results = segments[0]
1296            .search_dense_vector(
1297                embedding,
1298                &query_vec,
1299                5,
1300                1,
1301                crate::query::MultiValueCombiner::Max,
1302            )
1303            .unwrap();
1304        assert!(
1305            !results.is_empty(),
1306            "Search should return results after build"
1307        );
1308
1309        // Phase 3: Verify calling build_vector_index again is a no-op
1310        let writer = IndexWriter::open(dir.clone(), config.clone())
1311            .await
1312            .unwrap();
1313        writer.build_vector_index().await.unwrap(); // Should skip training
1314
1315        // Still built
1316        assert!(writer.is_vector_index_built(embedding).await);
1317    }
1318}