Skip to main content

hermes_core/index/
mod.rs

1//! Index - multi-segment async search index
2//!
3//! Components:
4//! - Index: main entry point for searching
5//! - IndexWriter: for adding documents and committing segments (native only)
6//! - Supports multiple segments with merge
7
8use std::path::Path;
9use std::sync::Arc;
10
11use parking_lot::RwLock;
12
13use rustc_hash::FxHashMap;
14
15use crate::DocId;
16use crate::directories::{Directory, SliceCachingDirectory};
17use crate::dsl::{Document, Field, Schema};
18use crate::error::{Error, Result};
19use crate::segment::{SegmentId, SegmentReader};
20use crate::structures::BlockPostingList;
21use crate::structures::{CoarseCentroids, PQCodebook};
22
23#[cfg(feature = "native")]
24mod vector_builder;
25#[cfg(feature = "native")]
26mod writer;
27#[cfg(feature = "native")]
28pub use writer::IndexWriter;
29
30mod metadata;
31pub use metadata::{FieldVectorMeta, INDEX_META_FILENAME, IndexMetadata, VectorIndexState};
32
33#[cfg(feature = "native")]
34mod helpers;
35#[cfg(feature = "native")]
36pub use helpers::{
37    IndexingStats, SchemaConfig, SchemaFieldConfig, create_index_at_path, create_index_from_sdl,
38    index_documents_from_reader, index_json_document, parse_schema,
39};
40
41/// Default file name for the slice cache
42pub const SLICE_CACHE_FILENAME: &str = "index.slicecache";
43
44/// Index configuration
45#[derive(Debug, Clone)]
46pub struct IndexConfig {
47    /// Number of threads for CPU-intensive tasks (search parallelism)
48    pub num_threads: usize,
49    /// Number of parallel segment builders (documents distributed round-robin)
50    pub num_indexing_threads: usize,
51    /// Number of threads for parallel block compression within each segment
52    pub num_compression_threads: usize,
53    /// Block cache size for term dictionary per segment
54    pub term_cache_blocks: usize,
55    /// Block cache size for document store per segment
56    pub store_cache_blocks: usize,
57    /// Max documents per segment before auto-commit
58    pub max_docs_per_segment: u32,
59    /// Merge policy for background segment merging
60    pub merge_policy: Box<dyn crate::merge::MergePolicy>,
61    /// Index optimization mode (adaptive, size-optimized, performance-optimized)
62    pub optimization: crate::structures::IndexOptimization,
63}
64
65impl Default for IndexConfig {
66    fn default() -> Self {
67        #[cfg(feature = "native")]
68        let cpus = num_cpus::get().max(1);
69        #[cfg(not(feature = "native"))]
70        let cpus = 1;
71
72        Self {
73            num_threads: cpus,
74            num_indexing_threads: 1,
75            num_compression_threads: cpus,
76            term_cache_blocks: 256,
77            store_cache_blocks: 32,
78            max_docs_per_segment: 100_000,
79            merge_policy: Box::new(crate::merge::TieredMergePolicy::default()),
80            optimization: crate::structures::IndexOptimization::default(),
81        }
82    }
83}
84
85/// Multi-segment async Index
86///
87/// The main entry point for searching. Manages multiple segments
88/// and provides unified search across all of them.
89pub struct Index<D: Directory> {
90    directory: Arc<D>,
91    schema: Arc<Schema>,
92    config: IndexConfig,
93    segments: RwLock<Vec<Arc<SegmentReader>>>,
94    default_fields: Vec<crate::Field>,
95    tokenizers: Arc<crate::tokenizer::TokenizerRegistry>,
96    /// Cached global statistics for cross-segment IDF computation
97    global_stats: crate::query::GlobalStatsCache,
98    /// Index-level trained centroids per field (loaded from metadata)
99    trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
100    /// Index-level trained PQ codebooks per field (for ScaNN)
101    trained_codebooks: FxHashMap<u32, Arc<PQCodebook>>,
102    #[cfg(feature = "native")]
103    thread_pool: Arc<rayon::ThreadPool>,
104}
105
106impl<D: Directory> Index<D> {
107    /// Open an existing index from a directory
108    pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
109        let directory = Arc::new(directory);
110
111        // Read schema
112        let schema_slice = directory.open_read(Path::new("schema.json")).await?;
113        let schema_bytes = schema_slice.read_bytes().await?;
114        let schema: Schema = serde_json::from_slice(schema_bytes.as_slice())
115            .map_err(|e| Error::Serialization(e.to_string()))?;
116        let schema = Arc::new(schema);
117
118        // Load metadata and trained structures
119        let (segments, trained_centroids, trained_codebooks) =
120            Self::load_segments_and_trained(&directory, &schema, &config).await?;
121
122        #[cfg(feature = "native")]
123        let thread_pool = {
124            let pool = rayon::ThreadPoolBuilder::new()
125                .num_threads(config.num_threads)
126                .build()
127                .map_err(|e| Error::Io(std::io::Error::other(e)))?;
128            Arc::new(pool)
129        };
130
131        // Use schema's default_fields if specified, otherwise fall back to all indexed text fields
132        let default_fields: Vec<crate::Field> = if !schema.default_fields().is_empty() {
133            schema.default_fields().to_vec()
134        } else {
135            schema
136                .fields()
137                .filter(|(_, entry)| {
138                    entry.indexed && entry.field_type == crate::dsl::FieldType::Text
139                })
140                .map(|(field, _)| field)
141                .collect()
142        };
143
144        Ok(Self {
145            directory,
146            schema,
147            config,
148            segments: RwLock::new(segments),
149            default_fields,
150            tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
151            global_stats: crate::query::GlobalStatsCache::new(),
152            trained_centroids,
153            trained_codebooks,
154            #[cfg(feature = "native")]
155            thread_pool,
156        })
157    }
158
159    /// Load segments and trained structures from metadata
160    async fn load_segments_and_trained(
161        directory: &Arc<D>,
162        schema: &Arc<Schema>,
163        config: &IndexConfig,
164    ) -> Result<(
165        Vec<Arc<SegmentReader>>,
166        FxHashMap<u32, Arc<CoarseCentroids>>,
167        FxHashMap<u32, Arc<PQCodebook>>,
168    )> {
169        // Load metadata
170        let meta = Self::load_metadata(directory).await?;
171
172        // Load trained centroids and codebooks
173        let (trained_centroids, trained_codebooks) =
174            meta.load_trained_structures(directory.as_ref()).await;
175
176        // Load segments
177        let mut segments = Vec::new();
178        let mut doc_id_offset = 0u32;
179
180        for id_str in meta.segments {
181            let segment_id = SegmentId::from_hex(&id_str)
182                .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
183            let reader = SegmentReader::open(
184                directory.as_ref(),
185                segment_id,
186                Arc::clone(schema),
187                doc_id_offset,
188                config.term_cache_blocks,
189            )
190            .await?;
191
192            doc_id_offset += reader.meta().num_docs;
193            segments.push(Arc::new(reader));
194        }
195
196        Ok((segments, trained_centroids, trained_codebooks))
197    }
198
199    /// Load metadata from metadata.json
200    async fn load_metadata(directory: &Arc<D>) -> Result<IndexMetadata> {
201        let meta_path = Path::new(INDEX_META_FILENAME);
202        if directory.exists(meta_path).await.unwrap_or(false) {
203            let slice = directory.open_read(meta_path).await?;
204            let bytes = slice.read_bytes().await?;
205            let meta: IndexMetadata = serde_json::from_slice(bytes.as_slice())
206                .map_err(|e| Error::Serialization(e.to_string()))?;
207            return Ok(meta);
208        }
209        Ok(IndexMetadata::new())
210    }
211
212    /// Get the schema
213    pub fn schema(&self) -> &Schema {
214        &self.schema
215    }
216
217    /// Get a reference to the underlying directory
218    pub fn directory(&self) -> &D {
219        &self.directory
220    }
221
222    /// Total number of documents across all segments
223    pub fn num_docs(&self) -> u32 {
224        self.segments.read().iter().map(|s| s.num_docs()).sum()
225    }
226
227    /// Get a document by global doc_id (async)
228    pub async fn doc(&self, doc_id: DocId) -> Result<Option<Document>> {
229        let segments = self.segments.read().clone();
230
231        let mut offset = 0u32;
232        for segment in segments.iter() {
233            let segment_docs = segment.meta().num_docs;
234            if doc_id < offset + segment_docs {
235                let local_doc_id = doc_id - offset;
236                return segment.doc(local_doc_id).await;
237            }
238            offset += segment_docs;
239        }
240
241        Ok(None)
242    }
243
244    /// Get posting lists for a term across all segments (async)
245    pub async fn get_postings(
246        &self,
247        field: Field,
248        term: &[u8],
249    ) -> Result<Vec<(Arc<SegmentReader>, BlockPostingList)>> {
250        let segments = self.segments.read().clone();
251        let mut results = Vec::new();
252
253        for segment in segments.iter() {
254            if let Some(postings) = segment.get_postings(field, term).await? {
255                results.push((Arc::clone(segment), postings));
256            }
257        }
258
259        Ok(results)
260    }
261
262    /// Execute CPU-intensive work on thread pool (native only)
263    #[cfg(feature = "native")]
264    pub async fn spawn_blocking<F, R>(&self, f: F) -> R
265    where
266        F: FnOnce() -> R + Send + 'static,
267        R: Send + 'static,
268    {
269        let (tx, rx) = tokio::sync::oneshot::channel();
270        self.thread_pool.spawn(move || {
271            let result = f();
272            let _ = tx.send(result);
273        });
274        rx.await.expect("Thread pool task panicked")
275    }
276
277    /// Get segment readers for query execution
278    pub fn segment_readers(&self) -> Vec<Arc<SegmentReader>> {
279        self.segments.read().clone()
280    }
281
282    /// Reload segments from directory (after new segments added or merges completed)
283    ///
284    /// This reads the current metadata.json and loads only segments that exist.
285    /// Safe to call at any time - will not reference deleted segments.
286    ///
287    /// Note: This only reloads segments, not trained structures (they are immutable once built)
288    pub async fn reload(&self) -> Result<()> {
289        // Load fresh metadata from disk
290        let meta = Self::load_metadata(&self.directory).await?;
291
292        // Load only segments that exist in current metadata
293        let mut new_segments = Vec::new();
294        let mut doc_id_offset = 0u32;
295
296        for id_str in meta.segments {
297            let segment_id = match SegmentId::from_hex(&id_str) {
298                Some(id) => id,
299                None => {
300                    log::warn!("Invalid segment ID in metadata: {}", id_str);
301                    continue;
302                }
303            };
304
305            // Try to open segment - skip if files don't exist (may have been deleted by merge)
306            match SegmentReader::open(
307                self.directory.as_ref(),
308                segment_id,
309                Arc::clone(&self.schema),
310                doc_id_offset,
311                self.config.term_cache_blocks,
312            )
313            .await
314            {
315                Ok(reader) => {
316                    doc_id_offset += reader.meta().num_docs;
317                    new_segments.push(Arc::new(reader));
318                }
319                Err(e) => {
320                    // Segment files may have been deleted by concurrent merge - skip it
321                    log::warn!(
322                        "Could not open segment {}: {:?} (may have been merged)",
323                        id_str,
324                        e
325                    );
326                }
327            }
328        }
329
330        *self.segments.write() = new_segments;
331        // Invalidate global stats cache since segments changed
332        self.global_stats.invalidate();
333        Ok(())
334    }
335
336    /// Check if segments need reloading (metadata changed since last load)
337    pub async fn needs_reload(&self) -> Result<bool> {
338        let meta = Self::load_metadata(&self.directory).await?;
339        let current_segments = self.segments.read();
340
341        // Compare segment count and IDs
342        if meta.segments.len() != current_segments.len() {
343            return Ok(true);
344        }
345
346        for (meta_id, reader) in meta.segments.iter().zip(current_segments.iter()) {
347            let reader_id = SegmentId::from_u128(reader.meta().id).to_hex();
348            if meta_id != &reader_id {
349                return Ok(true);
350            }
351        }
352
353        Ok(false)
354    }
355
356    /// Get global statistics for cross-segment IDF computation (sync, basic stats only)
357    ///
358    /// Returns cached stats if available. For full stats including term frequencies,
359    /// call `build_global_stats().await` first.
360    ///
361    /// This sync version only includes:
362    /// - Total docs
363    /// - Sparse vector dimension document frequencies
364    /// - Average field lengths
365    pub fn global_stats(&self) -> Option<Arc<crate::query::GlobalStats>> {
366        self.global_stats.get()
367    }
368
369    /// Build and cache global statistics (async, includes term frequencies)
370    ///
371    /// This iterates term dictionaries across all segments to compute
372    /// accurate cross-segment IDF values for full-text queries.
373    ///
374    /// Call this once after opening the index or after reload().
375    pub async fn build_global_stats(&self) -> Result<Arc<crate::query::GlobalStats>> {
376        // Return cached if available
377        if let Some(stats) = self.global_stats.get() {
378            return Ok(stats);
379        }
380
381        let segments = self.segments.read().clone();
382        let schema = &self.schema;
383        let mut builder = crate::query::GlobalStatsBuilder::new();
384
385        // Track field length sums for computing global avg
386        let mut field_len_sums: rustc_hash::FxHashMap<u32, (u64, u64)> =
387            rustc_hash::FxHashMap::default();
388
389        for segment in &segments {
390            let num_docs = segment.num_docs() as u64;
391            builder.total_docs += num_docs;
392
393            // Aggregate sparse vector statistics
394            for (&field_id, sparse_index) in segment.sparse_indexes() {
395                for (dim_id, posting_list) in sparse_index.postings.iter().enumerate() {
396                    if let Some(pl) = posting_list {
397                        builder.add_sparse_df(
398                            crate::dsl::Field(field_id),
399                            dim_id as u32,
400                            pl.doc_count() as u64,
401                        );
402                    }
403                }
404            }
405
406            // Aggregate text field average lengths
407            for (field, entry) in schema.fields() {
408                if entry.indexed && entry.field_type == crate::dsl::FieldType::Text {
409                    let avg_len = segment.avg_field_len(field);
410                    let (sum, count) = field_len_sums.entry(field.0).or_insert((0, 0));
411                    *sum += (avg_len * num_docs as f32) as u64;
412                    *count += num_docs;
413                }
414            }
415
416            // Iterate term dictionary to get term document frequencies
417            for (field, term, doc_freq) in segment.all_terms_with_stats().await? {
418                builder.add_text_df(field, term, doc_freq as u64);
419            }
420        }
421
422        // Set global average field lengths
423        for (field_id, (sum, count)) in field_len_sums {
424            if count > 0 {
425                let global_avg = sum as f32 / count as f32;
426                builder.set_avg_field_len(crate::dsl::Field(field_id), global_avg);
427            }
428        }
429
430        let generation = self.global_stats.generation();
431        let stats = builder.build(generation);
432        self.global_stats.set_stats(stats);
433
434        Ok(self.global_stats.get().unwrap())
435    }
436
437    /// Search and return results with document addresses (no document content)
438    ///
439    /// This is the primary search method. Use `get_document` to fetch document content.
440    pub async fn search(
441        &self,
442        query: &dyn crate::query::Query,
443        limit: usize,
444    ) -> Result<crate::query::SearchResponse> {
445        self.search_offset(query, limit, 0).await
446    }
447
448    /// Search with offset for pagination
449    pub async fn search_offset(
450        &self,
451        query: &dyn crate::query::Query,
452        limit: usize,
453        offset: usize,
454    ) -> Result<crate::query::SearchResponse> {
455        self.search_internal(query, limit, offset, false).await
456    }
457
458    /// Search with matched field ordinals (for multi-valued fields with position tracking)
459    ///
460    /// Returns which array elements matched for each field with position tracking enabled.
461    pub async fn search_with_matched_fields(
462        &self,
463        query: &dyn crate::query::Query,
464        limit: usize,
465    ) -> Result<crate::query::SearchResponse> {
466        self.search_internal(query, limit, 0, true).await
467    }
468
469    async fn search_internal(
470        &self,
471        query: &dyn crate::query::Query,
472        limit: usize,
473        offset: usize,
474        collect_positions: bool,
475    ) -> Result<crate::query::SearchResponse> {
476        let segments = self.segments.read().clone();
477        let mut all_results: Vec<(u128, crate::query::SearchResult)> = Vec::new();
478
479        // Fetch enough results to cover offset + limit
480        let fetch_limit = offset + limit;
481        for segment in &segments {
482            let segment_id = segment.meta().id;
483            let results = if collect_positions {
484                crate::query::search_segment_with_positions(segment.as_ref(), query, fetch_limit)
485                    .await?
486            } else {
487                crate::query::search_segment(segment.as_ref(), query, fetch_limit).await?
488            };
489            for result in results {
490                all_results.push((segment_id, result));
491            }
492        }
493
494        // Sort by score descending
495        all_results.sort_by(|a, b| {
496            b.1.score
497                .partial_cmp(&a.1.score)
498                .unwrap_or(std::cmp::Ordering::Equal)
499        });
500
501        // Total hits before pagination
502        let total_hits = all_results.len() as u32;
503
504        // Apply offset and limit
505        let hits: Vec<crate::query::SearchHit> = all_results
506            .into_iter()
507            .skip(offset)
508            .take(limit)
509            .map(|(segment_id, result)| crate::query::SearchHit {
510                address: crate::query::DocAddress::new(segment_id, result.doc_id),
511                score: result.score,
512                matched_fields: result.extract_ordinals(),
513            })
514            .collect();
515
516        Ok(crate::query::SearchResponse { hits, total_hits })
517    }
518
519    /// Get a document by its unique address (segment_id + local doc_id)
520    pub async fn get_document(
521        &self,
522        address: &crate::query::DocAddress,
523    ) -> Result<Option<Document>> {
524        let segment_id = address
525            .segment_id_u128()
526            .ok_or_else(|| Error::Query(format!("Invalid segment ID: {}", address.segment_id)))?;
527
528        let segments = self.segments.read().clone();
529        for segment in &segments {
530            if segment.meta().id == segment_id {
531                return segment.doc(address.doc_id).await;
532            }
533        }
534
535        Ok(None)
536    }
537
538    /// Get the default fields for this index
539    pub fn default_fields(&self) -> &[crate::Field] {
540        &self.default_fields
541    }
542
543    /// Set the default fields for query parsing
544    pub fn set_default_fields(&mut self, fields: Vec<crate::Field>) {
545        self.default_fields = fields;
546    }
547
548    /// Get the tokenizer registry
549    pub fn tokenizers(&self) -> &Arc<crate::tokenizer::TokenizerRegistry> {
550        &self.tokenizers
551    }
552
553    /// Create a query parser for this index
554    ///
555    /// If the schema contains query router rules, they will be used to route
556    /// queries to specific fields based on regex patterns.
557    pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
558        // Check if schema has query routers
559        let query_routers = self.schema.query_routers();
560        if !query_routers.is_empty() {
561            // Try to create a router from the schema's rules
562            if let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers) {
563                return crate::dsl::QueryLanguageParser::with_router(
564                    Arc::clone(&self.schema),
565                    self.default_fields.clone(),
566                    Arc::clone(&self.tokenizers),
567                    router,
568                );
569            }
570        }
571
572        // Fall back to parser without router
573        crate::dsl::QueryLanguageParser::new(
574            Arc::clone(&self.schema),
575            self.default_fields.clone(),
576            Arc::clone(&self.tokenizers),
577        )
578    }
579
580    /// Parse and search using a query string
581    ///
582    /// Accepts both query language syntax (field:term, AND, OR, NOT, grouping)
583    /// and simple text (tokenized and searched across default fields).
584    /// Returns document addresses (segment_id + doc_id) without document content.
585    pub async fn query(
586        &self,
587        query_str: &str,
588        limit: usize,
589    ) -> Result<crate::query::SearchResponse> {
590        self.query_offset(query_str, limit, 0).await
591    }
592
593    /// Query with offset for pagination
594    pub async fn query_offset(
595        &self,
596        query_str: &str,
597        limit: usize,
598        offset: usize,
599    ) -> Result<crate::query::SearchResponse> {
600        let parser = self.query_parser();
601        let query = parser.parse(query_str).map_err(Error::Query)?;
602        self.search_offset(query.as_ref(), limit, offset).await
603    }
604}
605
606/// Methods for opening index with slice caching
607impl<D: Directory> Index<SliceCachingDirectory<D>> {
608    /// Open an index with slice caching, automatically loading the cache file if present
609    ///
610    /// This wraps the directory in a SliceCachingDirectory and attempts to load
611    /// any existing slice cache file to prefill the cache with hot data.
612    pub async fn open_with_cache(
613        directory: D,
614        config: IndexConfig,
615        cache_max_bytes: usize,
616    ) -> Result<Self> {
617        let caching_dir = SliceCachingDirectory::new(directory, cache_max_bytes);
618
619        // Try to load existing slice cache
620        let cache_path = Path::new(SLICE_CACHE_FILENAME);
621        if let Ok(true) = caching_dir.inner().exists(cache_path).await
622            && let Ok(slice) = caching_dir.inner().open_read(cache_path).await
623            && let Ok(bytes) = slice.read_bytes().await
624        {
625            let _ = caching_dir.deserialize(bytes.as_slice());
626        }
627
628        Self::open(caching_dir, config).await
629    }
630
631    /// Serialize the current slice cache to the index directory
632    ///
633    /// This saves all cached slices to a single file that can be loaded
634    /// on subsequent index opens for faster startup.
635    #[cfg(feature = "native")]
636    pub async fn save_slice_cache(&self) -> Result<()>
637    where
638        D: crate::directories::DirectoryWriter,
639    {
640        let cache_data = self.directory.serialize();
641        let cache_path = Path::new(SLICE_CACHE_FILENAME);
642        self.directory
643            .inner()
644            .write(cache_path, &cache_data)
645            .await?;
646        Ok(())
647    }
648
649    /// Get slice cache statistics
650    pub fn slice_cache_stats(&self) -> crate::directories::SliceCacheStats {
651        self.directory.stats()
652    }
653}
654
655/// Warm up the slice cache by opening an index and performing typical read operations
656///
657/// This function opens an index using a SliceCachingDirectory, performs operations
658/// that would typically be done during search (reading term dictionaries, posting lists),
659/// and then serializes the cache to a file for future use.
660///
661/// The resulting cache file contains all the "hot" data that was read during warmup,
662/// allowing subsequent index opens to prefill the cache and avoid cold-start latency.
663#[cfg(feature = "native")]
664pub async fn warmup_and_save_slice_cache<D: crate::directories::DirectoryWriter>(
665    directory: D,
666    config: IndexConfig,
667    cache_max_bytes: usize,
668) -> Result<()> {
669    let caching_dir = SliceCachingDirectory::new(directory, cache_max_bytes);
670    let index = Index::open(caching_dir, config).await?;
671
672    // Warm up by loading segment metadata and term dictionaries
673    // The SegmentReader::open already reads essential metadata
674    // Additional warmup can be done by iterating terms or doing sample queries
675
676    // Save the cache
677    index.save_slice_cache().await?;
678
679    Ok(())
680}
681
682#[cfg(feature = "native")]
683impl<D: Directory> Clone for Index<D> {
684    fn clone(&self) -> Self {
685        Self {
686            directory: Arc::clone(&self.directory),
687            schema: Arc::clone(&self.schema),
688            config: self.config.clone(),
689            segments: RwLock::new(self.segments.read().clone()),
690            default_fields: self.default_fields.clone(),
691            tokenizers: Arc::clone(&self.tokenizers),
692            global_stats: crate::query::GlobalStatsCache::new(),
693            trained_centroids: self.trained_centroids.clone(),
694            trained_codebooks: self.trained_codebooks.clone(),
695            thread_pool: Arc::clone(&self.thread_pool),
696        }
697    }
698}
699
700#[cfg(test)]
701mod tests {
702    use super::*;
703    use crate::directories::RamDirectory;
704    use crate::dsl::SchemaBuilder;
705
706    #[tokio::test]
707    async fn test_index_create_and_search() {
708        let mut schema_builder = SchemaBuilder::default();
709        let title = schema_builder.add_text_field("title", true, true);
710        let body = schema_builder.add_text_field("body", true, true);
711        let schema = schema_builder.build();
712
713        let dir = RamDirectory::new();
714        let config = IndexConfig::default();
715
716        // Create index and add documents
717        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
718            .await
719            .unwrap();
720
721        let mut doc1 = Document::new();
722        doc1.add_text(title, "Hello World");
723        doc1.add_text(body, "This is the first document");
724        writer.add_document(doc1).await.unwrap();
725
726        let mut doc2 = Document::new();
727        doc2.add_text(title, "Goodbye World");
728        doc2.add_text(body, "This is the second document");
729        writer.add_document(doc2).await.unwrap();
730
731        writer.commit().await.unwrap();
732
733        // Open for reading
734        let index = Index::open(dir, config).await.unwrap();
735        assert_eq!(index.num_docs(), 2);
736
737        // Check postings
738        let postings = index.get_postings(title, b"world").await.unwrap();
739        assert_eq!(postings.len(), 1); // One segment
740        assert_eq!(postings[0].1.doc_count(), 2); // Two docs with "world"
741
742        // Retrieve document
743        let doc = index.doc(0).await.unwrap().unwrap();
744        assert_eq!(doc.get_first(title).unwrap().as_text(), Some("Hello World"));
745    }
746
747    #[tokio::test]
748    async fn test_multiple_segments() {
749        let mut schema_builder = SchemaBuilder::default();
750        let title = schema_builder.add_text_field("title", true, true);
751        let schema = schema_builder.build();
752
753        let dir = RamDirectory::new();
754        let config = IndexConfig {
755            max_docs_per_segment: 5, // Small segments for testing
756            ..Default::default()
757        };
758
759        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
760            .await
761            .unwrap();
762
763        // Add documents in batches to create multiple segments
764        for batch in 0..3 {
765            for i in 0..5 {
766                let mut doc = Document::new();
767                doc.add_text(title, format!("Document {} batch {}", i, batch));
768                writer.add_document(doc).await.unwrap();
769            }
770            writer.commit().await.unwrap();
771        }
772
773        // Open and check
774        let index = Index::open(dir, config).await.unwrap();
775        assert_eq!(index.num_docs(), 15);
776        assert_eq!(index.segment_readers().len(), 3);
777    }
778
779    #[tokio::test]
780    async fn test_segment_merge() {
781        let mut schema_builder = SchemaBuilder::default();
782        let title = schema_builder.add_text_field("title", true, true);
783        let schema = schema_builder.build();
784
785        let dir = RamDirectory::new();
786        let config = IndexConfig {
787            max_docs_per_segment: 3,
788            ..Default::default()
789        };
790
791        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
792            .await
793            .unwrap();
794
795        // Create multiple segments
796        for i in 0..9 {
797            let mut doc = Document::new();
798            doc.add_text(title, format!("Document {}", i));
799            writer.add_document(doc).await.unwrap();
800        }
801        writer.commit().await.unwrap();
802
803        // Should have 3 segments
804        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
805        assert_eq!(index.segment_readers().len(), 3);
806
807        // Force merge
808        let writer = IndexWriter::open(dir.clone(), config.clone())
809            .await
810            .unwrap();
811        writer.force_merge().await.unwrap();
812
813        // Should have 1 segment now
814        let index = Index::open(dir, config).await.unwrap();
815        assert_eq!(index.segment_readers().len(), 1);
816        assert_eq!(index.num_docs(), 9);
817
818        // Verify all documents accessible
819        for i in 0..9 {
820            let doc = index.doc(i).await.unwrap().unwrap();
821            assert_eq!(
822                doc.get_first(title).unwrap().as_text(),
823                Some(format!("Document {}", i).as_str())
824            );
825        }
826    }
827
828    #[tokio::test]
829    async fn test_match_query() {
830        let mut schema_builder = SchemaBuilder::default();
831        let title = schema_builder.add_text_field("title", true, true);
832        let body = schema_builder.add_text_field("body", true, true);
833        let schema = schema_builder.build();
834
835        let dir = RamDirectory::new();
836        let config = IndexConfig::default();
837
838        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
839            .await
840            .unwrap();
841
842        let mut doc1 = Document::new();
843        doc1.add_text(title, "rust programming");
844        doc1.add_text(body, "Learn rust language");
845        writer.add_document(doc1).await.unwrap();
846
847        let mut doc2 = Document::new();
848        doc2.add_text(title, "python programming");
849        doc2.add_text(body, "Learn python language");
850        writer.add_document(doc2).await.unwrap();
851
852        writer.commit().await.unwrap();
853
854        let index = Index::open(dir, config).await.unwrap();
855
856        // Test match query with multiple default fields
857        let results = index.query("rust", 10).await.unwrap();
858        assert_eq!(results.hits.len(), 1);
859
860        // Test match query with multiple tokens
861        let results = index.query("rust programming", 10).await.unwrap();
862        assert!(!results.hits.is_empty());
863
864        // Verify hit has address (segment_id + doc_id)
865        let hit = &results.hits[0];
866        assert!(!hit.address.segment_id.is_empty(), "Should have segment_id");
867
868        // Verify document retrieval by address
869        let doc = index.get_document(&hit.address).await.unwrap().unwrap();
870        assert!(
871            !doc.field_values().is_empty(),
872            "Doc should have field values"
873        );
874
875        // Also verify doc retrieval directly by global doc_id
876        let doc = index.doc(0).await.unwrap().unwrap();
877        assert!(
878            !doc.field_values().is_empty(),
879            "Doc should have field values"
880        );
881    }
882
883    #[tokio::test]
884    async fn test_slice_cache_warmup_and_load() {
885        use crate::directories::SliceCachingDirectory;
886
887        let mut schema_builder = SchemaBuilder::default();
888        let title = schema_builder.add_text_field("title", true, true);
889        let body = schema_builder.add_text_field("body", true, true);
890        let schema = schema_builder.build();
891
892        let dir = RamDirectory::new();
893        let config = IndexConfig::default();
894
895        // Create index with some documents
896        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
897            .await
898            .unwrap();
899
900        for i in 0..10 {
901            let mut doc = Document::new();
902            doc.add_text(title, format!("Document {} about rust", i));
903            doc.add_text(body, format!("This is body text number {}", i));
904            writer.add_document(doc).await.unwrap();
905        }
906        writer.commit().await.unwrap();
907
908        // Open with slice caching and perform some operations to warm up cache
909        let caching_dir = SliceCachingDirectory::new(dir.clone(), 1024 * 1024);
910        let index = Index::open(caching_dir, config.clone()).await.unwrap();
911
912        // Perform a search to warm up the cache
913        let results = index.query("rust", 10).await.unwrap();
914        assert!(!results.hits.is_empty());
915
916        // Check cache stats - should have cached some data
917        let stats = index.slice_cache_stats();
918        assert!(stats.total_bytes > 0, "Cache should have data after search");
919
920        // Save the cache
921        index.save_slice_cache().await.unwrap();
922
923        // Verify cache file was written
924        assert!(dir.exists(Path::new(SLICE_CACHE_FILENAME)).await.unwrap());
925
926        // Now open with cache loading
927        let index2 = Index::open_with_cache(dir.clone(), config.clone(), 1024 * 1024)
928            .await
929            .unwrap();
930
931        // Cache should be prefilled
932        let stats2 = index2.slice_cache_stats();
933        assert!(
934            stats2.total_bytes > 0,
935            "Cache should be prefilled from file"
936        );
937
938        // Search should still work
939        let results2 = index2.query("rust", 10).await.unwrap();
940        assert_eq!(results.hits.len(), results2.hits.len());
941    }
942
943    #[tokio::test]
944    async fn test_multivalue_field_indexing_and_search() {
945        let mut schema_builder = SchemaBuilder::default();
946        let uris = schema_builder.add_text_field("uris", true, true);
947        let title = schema_builder.add_text_field("title", true, true);
948        let schema = schema_builder.build();
949
950        let dir = RamDirectory::new();
951        let config = IndexConfig::default();
952
953        // Create index and add document with multi-value field
954        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
955            .await
956            .unwrap();
957
958        let mut doc = Document::new();
959        doc.add_text(uris, "one");
960        doc.add_text(uris, "two");
961        doc.add_text(title, "Test Document");
962        writer.add_document(doc).await.unwrap();
963
964        // Add another document with different uris
965        let mut doc2 = Document::new();
966        doc2.add_text(uris, "three");
967        doc2.add_text(title, "Another Document");
968        writer.add_document(doc2).await.unwrap();
969
970        writer.commit().await.unwrap();
971
972        // Open for reading
973        let index = Index::open(dir, config).await.unwrap();
974        assert_eq!(index.num_docs(), 2);
975
976        // Verify document retrieval preserves all values
977        let doc = index.doc(0).await.unwrap().unwrap();
978        let all_uris: Vec<_> = doc.get_all(uris).collect();
979        assert_eq!(all_uris.len(), 2, "Should have 2 uris values");
980        assert_eq!(all_uris[0].as_text(), Some("one"));
981        assert_eq!(all_uris[1].as_text(), Some("two"));
982
983        // Verify to_json returns array for multi-value field
984        let json = doc.to_json(index.schema());
985        let uris_json = json.get("uris").unwrap();
986        assert!(uris_json.is_array(), "Multi-value field should be an array");
987        let uris_arr = uris_json.as_array().unwrap();
988        assert_eq!(uris_arr.len(), 2);
989        assert_eq!(uris_arr[0].as_str(), Some("one"));
990        assert_eq!(uris_arr[1].as_str(), Some("two"));
991
992        // Verify both values are searchable
993        let results = index.query("uris:one", 10).await.unwrap();
994        assert_eq!(results.hits.len(), 1, "Should find doc with 'one'");
995        assert_eq!(results.hits[0].address.doc_id, 0);
996
997        let results = index.query("uris:two", 10).await.unwrap();
998        assert_eq!(results.hits.len(), 1, "Should find doc with 'two'");
999        assert_eq!(results.hits[0].address.doc_id, 0);
1000
1001        let results = index.query("uris:three", 10).await.unwrap();
1002        assert_eq!(results.hits.len(), 1, "Should find doc with 'three'");
1003        assert_eq!(results.hits[0].address.doc_id, 1);
1004
1005        // Verify searching for non-existent value returns no results
1006        let results = index.query("uris:nonexistent", 10).await.unwrap();
1007        assert_eq!(results.hits.len(), 0, "Should not find non-existent value");
1008    }
1009
1010    /// Comprehensive test for WAND optimization in BooleanQuery OR queries
1011    ///
1012    /// This test verifies that:
1013    /// 1. BooleanQuery with multiple SHOULD term queries uses WAND automatically
1014    /// 2. Search results are correct regardless of WAND optimization
1015    /// 3. Scores are reasonable for matching documents
1016    #[tokio::test]
1017    async fn test_wand_optimization_for_or_queries() {
1018        use crate::query::{BooleanQuery, TermQuery};
1019
1020        let mut schema_builder = SchemaBuilder::default();
1021        let content = schema_builder.add_text_field("content", true, true);
1022        let schema = schema_builder.build();
1023
1024        let dir = RamDirectory::new();
1025        let config = IndexConfig::default();
1026
1027        // Create index with documents containing various terms
1028        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1029            .await
1030            .unwrap();
1031
1032        // Doc 0: contains "rust" and "programming"
1033        let mut doc = Document::new();
1034        doc.add_text(content, "rust programming language is fast");
1035        writer.add_document(doc).await.unwrap();
1036
1037        // Doc 1: contains "rust" only
1038        let mut doc = Document::new();
1039        doc.add_text(content, "rust is a systems language");
1040        writer.add_document(doc).await.unwrap();
1041
1042        // Doc 2: contains "programming" only
1043        let mut doc = Document::new();
1044        doc.add_text(content, "programming is fun");
1045        writer.add_document(doc).await.unwrap();
1046
1047        // Doc 3: contains "python" (neither rust nor programming)
1048        let mut doc = Document::new();
1049        doc.add_text(content, "python is easy to learn");
1050        writer.add_document(doc).await.unwrap();
1051
1052        // Doc 4: contains both "rust" and "programming" multiple times
1053        let mut doc = Document::new();
1054        doc.add_text(content, "rust rust programming programming systems");
1055        writer.add_document(doc).await.unwrap();
1056
1057        writer.commit().await.unwrap();
1058
1059        // Open for reading
1060        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1061
1062        // Test 1: Pure OR query with multiple terms (should use WAND automatically)
1063        let or_query = BooleanQuery::new()
1064            .should(TermQuery::text(content, "rust"))
1065            .should(TermQuery::text(content, "programming"));
1066
1067        let results = index.search(&or_query, 10).await.unwrap();
1068
1069        // Should find docs 0, 1, 2, 4 (all that contain "rust" OR "programming")
1070        assert_eq!(results.hits.len(), 4, "Should find exactly 4 documents");
1071
1072        let doc_ids: Vec<u32> = results.hits.iter().map(|h| h.address.doc_id).collect();
1073        assert!(doc_ids.contains(&0), "Should find doc 0");
1074        assert!(doc_ids.contains(&1), "Should find doc 1");
1075        assert!(doc_ids.contains(&2), "Should find doc 2");
1076        assert!(doc_ids.contains(&4), "Should find doc 4");
1077        assert!(
1078            !doc_ids.contains(&3),
1079            "Should NOT find doc 3 (only has 'python')"
1080        );
1081
1082        // Test 2: Single term query (should NOT use WAND, but still work)
1083        let single_query = BooleanQuery::new().should(TermQuery::text(content, "rust"));
1084
1085        let results = index.search(&single_query, 10).await.unwrap();
1086        assert_eq!(results.hits.len(), 3, "Should find 3 documents with 'rust'");
1087
1088        // Test 3: Query with MUST (should NOT use WAND)
1089        let must_query = BooleanQuery::new()
1090            .must(TermQuery::text(content, "rust"))
1091            .should(TermQuery::text(content, "programming"));
1092
1093        let results = index.search(&must_query, 10).await.unwrap();
1094        // Must have "rust", optionally "programming"
1095        assert_eq!(results.hits.len(), 3, "Should find 3 documents with 'rust'");
1096
1097        // Test 4: Query with MUST_NOT (should NOT use WAND)
1098        let must_not_query = BooleanQuery::new()
1099            .should(TermQuery::text(content, "rust"))
1100            .should(TermQuery::text(content, "programming"))
1101            .must_not(TermQuery::text(content, "systems"));
1102
1103        let results = index.search(&must_not_query, 10).await.unwrap();
1104        // Should exclude docs with "systems" (doc 1 and 4)
1105        let doc_ids: Vec<u32> = results.hits.iter().map(|h| h.address.doc_id).collect();
1106        assert!(
1107            !doc_ids.contains(&1),
1108            "Should NOT find doc 1 (has 'systems')"
1109        );
1110        assert!(
1111            !doc_ids.contains(&4),
1112            "Should NOT find doc 4 (has 'systems')"
1113        );
1114
1115        // Test 5: Verify top-k limit works correctly with WAND
1116        let or_query = BooleanQuery::new()
1117            .should(TermQuery::text(content, "rust"))
1118            .should(TermQuery::text(content, "programming"));
1119
1120        let results = index.search(&or_query, 2).await.unwrap();
1121        assert_eq!(results.hits.len(), 2, "Should return only top 2 results");
1122
1123        // Top results should be docs that match both terms (higher scores)
1124        // Doc 0 and 4 contain both "rust" and "programming"
1125    }
1126
1127    /// Test that WAND optimization produces same results as non-WAND for correctness
1128    #[tokio::test]
1129    async fn test_wand_results_match_standard_boolean() {
1130        use crate::query::{BooleanQuery, TermQuery, WandOrQuery};
1131
1132        let mut schema_builder = SchemaBuilder::default();
1133        let content = schema_builder.add_text_field("content", true, true);
1134        let schema = schema_builder.build();
1135
1136        let dir = RamDirectory::new();
1137        let config = IndexConfig::default();
1138
1139        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1140            .await
1141            .unwrap();
1142
1143        // Add several documents
1144        for i in 0..10 {
1145            let mut doc = Document::new();
1146            let text = match i % 4 {
1147                0 => "apple banana cherry",
1148                1 => "apple orange",
1149                2 => "banana grape",
1150                _ => "cherry date",
1151            };
1152            doc.add_text(content, text);
1153            writer.add_document(doc).await.unwrap();
1154        }
1155
1156        writer.commit().await.unwrap();
1157        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1158
1159        // Compare explicit WandOrQuery with auto-optimized BooleanQuery
1160        let wand_query = WandOrQuery::new(content).term("apple").term("banana");
1161
1162        let bool_query = BooleanQuery::new()
1163            .should(TermQuery::text(content, "apple"))
1164            .should(TermQuery::text(content, "banana"));
1165
1166        let wand_results = index.search(&wand_query, 10).await.unwrap();
1167        let bool_results = index.search(&bool_query, 10).await.unwrap();
1168
1169        // Both should find the same documents
1170        assert_eq!(
1171            wand_results.hits.len(),
1172            bool_results.hits.len(),
1173            "WAND and Boolean should find same number of docs"
1174        );
1175
1176        let wand_docs: std::collections::HashSet<u32> =
1177            wand_results.hits.iter().map(|h| h.address.doc_id).collect();
1178        let bool_docs: std::collections::HashSet<u32> =
1179            bool_results.hits.iter().map(|h| h.address.doc_id).collect();
1180
1181        assert_eq!(
1182            wand_docs, bool_docs,
1183            "WAND and Boolean should find same documents"
1184        );
1185    }
1186
1187    #[tokio::test]
1188    async fn test_vector_index_threshold_switch() {
1189        use crate::dsl::{DenseVectorConfig, VectorIndexType};
1190
1191        // Create schema with dense vector field configured for IVF-RaBitQ
1192        let mut schema_builder = SchemaBuilder::default();
1193        let title = schema_builder.add_text_field("title", true, true);
1194        let embedding = schema_builder.add_dense_vector_field_with_config(
1195            "embedding",
1196            true, // indexed
1197            true, // stored
1198            DenseVectorConfig {
1199                dim: 8,
1200                index_type: VectorIndexType::IvfRaBitQ,
1201                store_raw: true,
1202                num_clusters: Some(4), // Small for test
1203                nprobe: 2,
1204                mrl_dim: None,
1205                build_threshold: Some(50), // Build when we have 50+ vectors
1206            },
1207        );
1208        let schema = schema_builder.build();
1209
1210        let dir = RamDirectory::new();
1211        let config = IndexConfig::default();
1212
1213        // Phase 1: Add vectors below threshold (should use Flat index)
1214        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1215            .await
1216            .unwrap();
1217
1218        // Add 30 documents (below threshold of 50)
1219        for i in 0..30 {
1220            let mut doc = Document::new();
1221            doc.add_text(title, format!("Document {}", i));
1222            // Simple embedding: [i, i, i, i, i, i, i, i] normalized
1223            let vec: Vec<f32> = (0..8).map(|_| (i as f32) / 30.0).collect();
1224            doc.add_dense_vector(embedding, vec);
1225            writer.add_document(doc).await.unwrap();
1226        }
1227        writer.commit().await.unwrap();
1228
1229        // Open index and verify it's using Flat (not built yet)
1230        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1231        assert!(
1232            index.trained_centroids.is_empty(),
1233            "Should not have trained centroids below threshold"
1234        );
1235
1236        // Search should work with Flat index
1237        let query_vec: Vec<f32> = vec![0.5; 8];
1238        let segments = index.segment_readers();
1239        assert!(!segments.is_empty());
1240
1241        let results = segments[0]
1242            .search_dense_vector(
1243                embedding,
1244                &query_vec,
1245                5,
1246                1,
1247                crate::query::MultiValueCombiner::Max,
1248            )
1249            .unwrap();
1250        assert!(!results.is_empty(), "Flat search should return results");
1251
1252        // Phase 2: Add more vectors to cross threshold
1253        let writer = IndexWriter::open(dir.clone(), config.clone())
1254            .await
1255            .unwrap();
1256
1257        // Add 30 more documents (total 60, above threshold of 50)
1258        for i in 30..60 {
1259            let mut doc = Document::new();
1260            doc.add_text(title, format!("Document {}", i));
1261            let vec: Vec<f32> = (0..8).map(|_| (i as f32) / 60.0).collect();
1262            doc.add_dense_vector(embedding, vec);
1263            writer.add_document(doc).await.unwrap();
1264        }
1265        // Commit auto-triggers vector index build when threshold is crossed
1266        writer.commit().await.unwrap();
1267
1268        // Verify centroids were trained (auto-triggered)
1269        assert!(
1270            writer.is_vector_index_built(embedding).await,
1271            "Vector index should be built after crossing threshold"
1272        );
1273
1274        // Reopen index and verify trained structures are loaded
1275        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1276        assert!(
1277            index.trained_centroids.contains_key(&embedding.0),
1278            "Should have loaded trained centroids for embedding field"
1279        );
1280
1281        // Search should still work
1282        let segments = index.segment_readers();
1283        let results = segments[0]
1284            .search_dense_vector(
1285                embedding,
1286                &query_vec,
1287                5,
1288                1,
1289                crate::query::MultiValueCombiner::Max,
1290            )
1291            .unwrap();
1292        assert!(
1293            !results.is_empty(),
1294            "Search should return results after build"
1295        );
1296
1297        // Phase 3: Verify calling build_vector_index again is a no-op
1298        let writer = IndexWriter::open(dir.clone(), config.clone())
1299            .await
1300            .unwrap();
1301        writer.build_vector_index().await.unwrap(); // Should skip training
1302
1303        // Still built
1304        assert!(writer.is_vector_index_built(embedding).await);
1305    }
1306}