Skip to main content

hermes_core/index/
mod.rs

1//! Index - multi-segment async search index
2//!
3//! Components:
4//! - Index: main entry point for searching
5//! - IndexWriter: for adding documents and committing segments (native only)
6//! - Supports multiple segments with merge
7
8use std::path::Path;
9use std::sync::Arc;
10
11use parking_lot::RwLock;
12
13use crate::DocId;
14use crate::directories::{Directory, SliceCachingDirectory};
15use crate::dsl::{Document, Field, Schema};
16use crate::error::{Error, Result};
17use crate::segment::{SegmentId, SegmentReader};
18use crate::structures::BlockPostingList;
19
20#[cfg(feature = "native")]
21mod writer;
22#[cfg(feature = "native")]
23pub use writer::IndexWriter;
24
25#[cfg(feature = "native")]
26mod helpers;
27#[cfg(feature = "native")]
28pub use helpers::{
29    IndexingStats, SchemaConfig, SchemaFieldConfig, create_index_at_path, create_index_from_sdl,
30    index_documents_from_reader, index_json_document, parse_schema,
31};
32
33/// Default file name for the slice cache
34pub const SLICE_CACHE_FILENAME: &str = "index.slicecache";
35
36/// Index configuration
37#[derive(Debug, Clone)]
38pub struct IndexConfig {
39    /// Number of threads for CPU-intensive tasks (search parallelism)
40    pub num_threads: usize,
41    /// Number of parallel segment builders (documents distributed round-robin)
42    pub num_indexing_threads: usize,
43    /// Number of threads for parallel block compression within each segment
44    pub num_compression_threads: usize,
45    /// Block cache size for term dictionary per segment
46    pub term_cache_blocks: usize,
47    /// Block cache size for document store per segment
48    pub store_cache_blocks: usize,
49    /// Max documents per segment before auto-commit
50    pub max_docs_per_segment: u32,
51    /// Merge policy for background segment merging
52    pub merge_policy: Box<dyn crate::merge::MergePolicy>,
53    /// Index optimization mode (adaptive, size-optimized, performance-optimized)
54    pub optimization: crate::structures::IndexOptimization,
55}
56
57impl Default for IndexConfig {
58    fn default() -> Self {
59        #[cfg(feature = "native")]
60        let cpus = num_cpus::get().max(1);
61        #[cfg(not(feature = "native"))]
62        let cpus = 1;
63
64        Self {
65            num_threads: cpus,
66            num_indexing_threads: 1,
67            num_compression_threads: cpus,
68            term_cache_blocks: 256,
69            store_cache_blocks: 32,
70            max_docs_per_segment: 100_000,
71            merge_policy: Box::new(crate::merge::TieredMergePolicy::default()),
72            optimization: crate::structures::IndexOptimization::default(),
73        }
74    }
75}
76
77/// Multi-segment async Index
78///
79/// The main entry point for searching. Manages multiple segments
80/// and provides unified search across all of them.
81pub struct Index<D: Directory> {
82    directory: Arc<D>,
83    schema: Arc<Schema>,
84    config: IndexConfig,
85    segments: RwLock<Vec<Arc<SegmentReader>>>,
86    default_fields: Vec<crate::Field>,
87    tokenizers: Arc<crate::tokenizer::TokenizerRegistry>,
88    /// Cached global statistics for cross-segment IDF computation
89    global_stats: crate::query::GlobalStatsCache,
90    #[cfg(feature = "native")]
91    thread_pool: Arc<rayon::ThreadPool>,
92}
93
94impl<D: Directory> Index<D> {
95    /// Open an existing index from a directory
96    pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
97        let directory = Arc::new(directory);
98
99        // Read schema
100        let schema_slice = directory.open_read(Path::new("schema.json")).await?;
101        let schema_bytes = schema_slice.read_bytes().await?;
102        let schema: Schema = serde_json::from_slice(schema_bytes.as_slice())
103            .map_err(|e| Error::Serialization(e.to_string()))?;
104        let schema = Arc::new(schema);
105
106        // Read segment list
107        let segments = Self::load_segments(&directory, &schema, &config).await?;
108
109        #[cfg(feature = "native")]
110        let thread_pool = {
111            let pool = rayon::ThreadPoolBuilder::new()
112                .num_threads(config.num_threads)
113                .build()
114                .map_err(|e| Error::Io(std::io::Error::other(e)))?;
115            Arc::new(pool)
116        };
117
118        // Use schema's default_fields if specified, otherwise fall back to all indexed text fields
119        let default_fields: Vec<crate::Field> = if !schema.default_fields().is_empty() {
120            schema.default_fields().to_vec()
121        } else {
122            schema
123                .fields()
124                .filter(|(_, entry)| {
125                    entry.indexed && entry.field_type == crate::dsl::FieldType::Text
126                })
127                .map(|(field, _)| field)
128                .collect()
129        };
130
131        Ok(Self {
132            directory,
133            schema,
134            config,
135            segments: RwLock::new(segments),
136            default_fields,
137            tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
138            global_stats: crate::query::GlobalStatsCache::new(),
139            #[cfg(feature = "native")]
140            thread_pool,
141        })
142    }
143
144    async fn load_segments(
145        directory: &Arc<D>,
146        schema: &Arc<Schema>,
147        config: &IndexConfig,
148    ) -> Result<Vec<Arc<SegmentReader>>> {
149        // Read segments.json which lists all segment IDs
150        let segments_path = Path::new("segments.json");
151        if !directory.exists(segments_path).await? {
152            return Ok(Vec::new());
153        }
154
155        let segments_slice = directory.open_read(segments_path).await?;
156        let segments_bytes = segments_slice.read_bytes().await?;
157        let segment_ids: Vec<String> = serde_json::from_slice(segments_bytes.as_slice())
158            .map_err(|e| Error::Serialization(e.to_string()))?;
159
160        let mut segments = Vec::new();
161        let mut doc_id_offset = 0u32;
162
163        for id_str in segment_ids {
164            let segment_id = SegmentId::from_hex(&id_str)
165                .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
166            let reader = SegmentReader::open(
167                directory.as_ref(),
168                segment_id,
169                Arc::clone(schema),
170                doc_id_offset,
171                config.term_cache_blocks,
172            )
173            .await?;
174
175            doc_id_offset += reader.meta().num_docs;
176            segments.push(Arc::new(reader));
177        }
178
179        Ok(segments)
180    }
181
182    /// Get the schema
183    pub fn schema(&self) -> &Schema {
184        &self.schema
185    }
186
187    /// Get a reference to the underlying directory
188    pub fn directory(&self) -> &D {
189        &self.directory
190    }
191
192    /// Total number of documents across all segments
193    pub fn num_docs(&self) -> u32 {
194        self.segments.read().iter().map(|s| s.num_docs()).sum()
195    }
196
197    /// Get a document by global doc_id (async)
198    pub async fn doc(&self, doc_id: DocId) -> Result<Option<Document>> {
199        let segments = self.segments.read().clone();
200
201        let mut offset = 0u32;
202        for segment in segments.iter() {
203            let segment_docs = segment.meta().num_docs;
204            if doc_id < offset + segment_docs {
205                let local_doc_id = doc_id - offset;
206                return segment.doc(local_doc_id).await;
207            }
208            offset += segment_docs;
209        }
210
211        Ok(None)
212    }
213
214    /// Get posting lists for a term across all segments (async)
215    pub async fn get_postings(
216        &self,
217        field: Field,
218        term: &[u8],
219    ) -> Result<Vec<(Arc<SegmentReader>, BlockPostingList)>> {
220        let segments = self.segments.read().clone();
221        let mut results = Vec::new();
222
223        for segment in segments.iter() {
224            if let Some(postings) = segment.get_postings(field, term).await? {
225                results.push((Arc::clone(segment), postings));
226            }
227        }
228
229        Ok(results)
230    }
231
232    /// Execute CPU-intensive work on thread pool (native only)
233    #[cfg(feature = "native")]
234    pub async fn spawn_blocking<F, R>(&self, f: F) -> R
235    where
236        F: FnOnce() -> R + Send + 'static,
237        R: Send + 'static,
238    {
239        let (tx, rx) = tokio::sync::oneshot::channel();
240        self.thread_pool.spawn(move || {
241            let result = f();
242            let _ = tx.send(result);
243        });
244        rx.await.expect("Thread pool task panicked")
245    }
246
247    /// Get segment readers for query execution
248    pub fn segment_readers(&self) -> Vec<Arc<SegmentReader>> {
249        self.segments.read().clone()
250    }
251
252    /// Reload segments from directory (after new segments added)
253    pub async fn reload(&self) -> Result<()> {
254        let new_segments = Self::load_segments(&self.directory, &self.schema, &self.config).await?;
255        *self.segments.write() = new_segments;
256        // Invalidate global stats cache since segments changed
257        self.global_stats.invalidate();
258        Ok(())
259    }
260
261    /// Get global statistics for cross-segment IDF computation (sync, basic stats only)
262    ///
263    /// Returns cached stats if available. For full stats including term frequencies,
264    /// call `build_global_stats().await` first.
265    ///
266    /// This sync version only includes:
267    /// - Total docs
268    /// - Sparse vector dimension document frequencies
269    /// - Average field lengths
270    pub fn global_stats(&self) -> Option<Arc<crate::query::GlobalStats>> {
271        self.global_stats.get()
272    }
273
274    /// Build and cache global statistics (async, includes term frequencies)
275    ///
276    /// This iterates term dictionaries across all segments to compute
277    /// accurate cross-segment IDF values for full-text queries.
278    ///
279    /// Call this once after opening the index or after reload().
280    pub async fn build_global_stats(&self) -> Result<Arc<crate::query::GlobalStats>> {
281        // Return cached if available
282        if let Some(stats) = self.global_stats.get() {
283            return Ok(stats);
284        }
285
286        let segments = self.segments.read().clone();
287        let schema = &self.schema;
288        let mut builder = crate::query::GlobalStatsBuilder::new();
289
290        // Track field length sums for computing global avg
291        let mut field_len_sums: rustc_hash::FxHashMap<u32, (u64, u64)> =
292            rustc_hash::FxHashMap::default();
293
294        for segment in &segments {
295            let num_docs = segment.num_docs() as u64;
296            builder.total_docs += num_docs;
297
298            // Aggregate sparse vector statistics
299            for (&field_id, sparse_index) in segment.sparse_indexes() {
300                for (dim_id, posting_list) in sparse_index.postings.iter().enumerate() {
301                    if let Some(pl) = posting_list {
302                        builder.add_sparse_df(
303                            crate::dsl::Field(field_id),
304                            dim_id as u32,
305                            pl.doc_count() as u64,
306                        );
307                    }
308                }
309            }
310
311            // Aggregate text field average lengths
312            for (field, entry) in schema.fields() {
313                if entry.indexed && entry.field_type == crate::dsl::FieldType::Text {
314                    let avg_len = segment.avg_field_len(field);
315                    let (sum, count) = field_len_sums.entry(field.0).or_insert((0, 0));
316                    *sum += (avg_len * num_docs as f32) as u64;
317                    *count += num_docs;
318                }
319            }
320
321            // Iterate term dictionary to get term document frequencies
322            for (field, term, doc_freq) in segment.all_terms_with_stats().await? {
323                builder.add_text_df(field, term, doc_freq as u64);
324            }
325        }
326
327        // Set global average field lengths
328        for (field_id, (sum, count)) in field_len_sums {
329            if count > 0 {
330                let global_avg = sum as f32 / count as f32;
331                builder.set_avg_field_len(crate::dsl::Field(field_id), global_avg);
332            }
333        }
334
335        let generation = self.global_stats.generation();
336        let stats = builder.build(generation);
337        self.global_stats.set_stats(stats);
338
339        Ok(self.global_stats.get().unwrap())
340    }
341
342    /// Search and return results with document addresses (no document content)
343    ///
344    /// This is the primary search method. Use `get_document` to fetch document content.
345    pub async fn search(
346        &self,
347        query: &dyn crate::query::Query,
348        limit: usize,
349    ) -> Result<crate::query::SearchResponse> {
350        self.search_offset(query, limit, 0).await
351    }
352
353    /// Search with offset for pagination
354    pub async fn search_offset(
355        &self,
356        query: &dyn crate::query::Query,
357        limit: usize,
358        offset: usize,
359    ) -> Result<crate::query::SearchResponse> {
360        self.search_internal(query, limit, offset, false).await
361    }
362
363    /// Search with matched field ordinals (for multi-valued fields with position tracking)
364    ///
365    /// Returns which array elements matched for each field with position tracking enabled.
366    pub async fn search_with_matched_fields(
367        &self,
368        query: &dyn crate::query::Query,
369        limit: usize,
370    ) -> Result<crate::query::SearchResponse> {
371        self.search_internal(query, limit, 0, true).await
372    }
373
374    async fn search_internal(
375        &self,
376        query: &dyn crate::query::Query,
377        limit: usize,
378        offset: usize,
379        collect_positions: bool,
380    ) -> Result<crate::query::SearchResponse> {
381        let segments = self.segments.read().clone();
382        let mut all_results: Vec<(u128, crate::query::SearchResult)> = Vec::new();
383
384        // Fetch enough results to cover offset + limit
385        let fetch_limit = offset + limit;
386        for segment in &segments {
387            let segment_id = segment.meta().id;
388            let results = crate::query::search_segment_with_positions(
389                segment.as_ref(),
390                query,
391                fetch_limit,
392                collect_positions,
393            )
394            .await?;
395            for result in results {
396                all_results.push((segment_id, result));
397            }
398        }
399
400        // Sort by score descending
401        all_results.sort_by(|a, b| {
402            b.1.score
403                .partial_cmp(&a.1.score)
404                .unwrap_or(std::cmp::Ordering::Equal)
405        });
406
407        // Total hits before pagination
408        let total_hits = all_results.len() as u32;
409
410        // Apply offset and limit
411        let hits: Vec<crate::query::SearchHit> = all_results
412            .into_iter()
413            .skip(offset)
414            .take(limit)
415            .map(|(segment_id, result)| crate::query::SearchHit {
416                address: crate::query::DocAddress::new(segment_id, result.doc_id),
417                score: result.score,
418                matched_fields: result.extract_ordinals(),
419            })
420            .collect();
421
422        Ok(crate::query::SearchResponse { hits, total_hits })
423    }
424
425    /// Get a document by its unique address (segment_id + local doc_id)
426    pub async fn get_document(
427        &self,
428        address: &crate::query::DocAddress,
429    ) -> Result<Option<Document>> {
430        let segment_id = address
431            .segment_id_u128()
432            .ok_or_else(|| Error::Query(format!("Invalid segment ID: {}", address.segment_id)))?;
433
434        let segments = self.segments.read().clone();
435        for segment in &segments {
436            if segment.meta().id == segment_id {
437                return segment.doc(address.doc_id).await;
438            }
439        }
440
441        Ok(None)
442    }
443
444    /// Get the default fields for this index
445    pub fn default_fields(&self) -> &[crate::Field] {
446        &self.default_fields
447    }
448
449    /// Set the default fields for query parsing
450    pub fn set_default_fields(&mut self, fields: Vec<crate::Field>) {
451        self.default_fields = fields;
452    }
453
454    /// Get the tokenizer registry
455    pub fn tokenizers(&self) -> &Arc<crate::tokenizer::TokenizerRegistry> {
456        &self.tokenizers
457    }
458
459    /// Create a query parser for this index
460    ///
461    /// If the schema contains query router rules, they will be used to route
462    /// queries to specific fields based on regex patterns.
463    pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
464        // Check if schema has query routers
465        let query_routers = self.schema.query_routers();
466        if !query_routers.is_empty() {
467            // Try to create a router from the schema's rules
468            if let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers) {
469                return crate::dsl::QueryLanguageParser::with_router(
470                    Arc::clone(&self.schema),
471                    self.default_fields.clone(),
472                    Arc::clone(&self.tokenizers),
473                    router,
474                );
475            }
476        }
477
478        // Fall back to parser without router
479        crate::dsl::QueryLanguageParser::new(
480            Arc::clone(&self.schema),
481            self.default_fields.clone(),
482            Arc::clone(&self.tokenizers),
483        )
484    }
485
486    /// Parse and search using a query string
487    ///
488    /// Accepts both query language syntax (field:term, AND, OR, NOT, grouping)
489    /// and simple text (tokenized and searched across default fields).
490    /// Returns document addresses (segment_id + doc_id) without document content.
491    pub async fn query(
492        &self,
493        query_str: &str,
494        limit: usize,
495    ) -> Result<crate::query::SearchResponse> {
496        self.query_offset(query_str, limit, 0).await
497    }
498
499    /// Query with offset for pagination
500    pub async fn query_offset(
501        &self,
502        query_str: &str,
503        limit: usize,
504        offset: usize,
505    ) -> Result<crate::query::SearchResponse> {
506        let parser = self.query_parser();
507        let query = parser.parse(query_str).map_err(Error::Query)?;
508        self.search_offset(query.as_ref(), limit, offset).await
509    }
510}
511
512/// Methods for opening index with slice caching
513impl<D: Directory> Index<SliceCachingDirectory<D>> {
514    /// Open an index with slice caching, automatically loading the cache file if present
515    ///
516    /// This wraps the directory in a SliceCachingDirectory and attempts to load
517    /// any existing slice cache file to prefill the cache with hot data.
518    pub async fn open_with_cache(
519        directory: D,
520        config: IndexConfig,
521        cache_max_bytes: usize,
522    ) -> Result<Self> {
523        let caching_dir = SliceCachingDirectory::new(directory, cache_max_bytes);
524
525        // Try to load existing slice cache
526        let cache_path = Path::new(SLICE_CACHE_FILENAME);
527        if let Ok(true) = caching_dir.inner().exists(cache_path).await
528            && let Ok(slice) = caching_dir.inner().open_read(cache_path).await
529            && let Ok(bytes) = slice.read_bytes().await
530        {
531            let _ = caching_dir.deserialize(bytes.as_slice());
532        }
533
534        Self::open(caching_dir, config).await
535    }
536
537    /// Serialize the current slice cache to the index directory
538    ///
539    /// This saves all cached slices to a single file that can be loaded
540    /// on subsequent index opens for faster startup.
541    #[cfg(feature = "native")]
542    pub async fn save_slice_cache(&self) -> Result<()>
543    where
544        D: crate::directories::DirectoryWriter,
545    {
546        let cache_data = self.directory.serialize();
547        let cache_path = Path::new(SLICE_CACHE_FILENAME);
548        self.directory
549            .inner()
550            .write(cache_path, &cache_data)
551            .await?;
552        Ok(())
553    }
554
555    /// Get slice cache statistics
556    pub fn slice_cache_stats(&self) -> crate::directories::SliceCacheStats {
557        self.directory.stats()
558    }
559}
560
561/// Warm up the slice cache by opening an index and performing typical read operations
562///
563/// This function opens an index using a SliceCachingDirectory, performs operations
564/// that would typically be done during search (reading term dictionaries, posting lists),
565/// and then serializes the cache to a file for future use.
566///
567/// The resulting cache file contains all the "hot" data that was read during warmup,
568/// allowing subsequent index opens to prefill the cache and avoid cold-start latency.
569#[cfg(feature = "native")]
570pub async fn warmup_and_save_slice_cache<D: crate::directories::DirectoryWriter>(
571    directory: D,
572    config: IndexConfig,
573    cache_max_bytes: usize,
574) -> Result<()> {
575    let caching_dir = SliceCachingDirectory::new(directory, cache_max_bytes);
576    let index = Index::open(caching_dir, config).await?;
577
578    // Warm up by loading segment metadata and term dictionaries
579    // The SegmentReader::open already reads essential metadata
580    // Additional warmup can be done by iterating terms or doing sample queries
581
582    // Save the cache
583    index.save_slice_cache().await?;
584
585    Ok(())
586}
587
588#[cfg(feature = "native")]
589impl<D: Directory> Clone for Index<D> {
590    fn clone(&self) -> Self {
591        Self {
592            directory: Arc::clone(&self.directory),
593            schema: Arc::clone(&self.schema),
594            config: self.config.clone(),
595            segments: RwLock::new(self.segments.read().clone()),
596            default_fields: self.default_fields.clone(),
597            tokenizers: Arc::clone(&self.tokenizers),
598            global_stats: crate::query::GlobalStatsCache::new(),
599            thread_pool: Arc::clone(&self.thread_pool),
600        }
601    }
602}
603
604#[cfg(test)]
605mod tests {
606    use super::*;
607    use crate::directories::RamDirectory;
608    use crate::dsl::SchemaBuilder;
609
610    #[tokio::test]
611    async fn test_index_create_and_search() {
612        let mut schema_builder = SchemaBuilder::default();
613        let title = schema_builder.add_text_field("title", true, true);
614        let body = schema_builder.add_text_field("body", true, true);
615        let schema = schema_builder.build();
616
617        let dir = RamDirectory::new();
618        let config = IndexConfig::default();
619
620        // Create index and add documents
621        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
622            .await
623            .unwrap();
624
625        let mut doc1 = Document::new();
626        doc1.add_text(title, "Hello World");
627        doc1.add_text(body, "This is the first document");
628        writer.add_document(doc1).await.unwrap();
629
630        let mut doc2 = Document::new();
631        doc2.add_text(title, "Goodbye World");
632        doc2.add_text(body, "This is the second document");
633        writer.add_document(doc2).await.unwrap();
634
635        writer.commit().await.unwrap();
636
637        // Open for reading
638        let index = Index::open(dir, config).await.unwrap();
639        assert_eq!(index.num_docs(), 2);
640
641        // Check postings
642        let postings = index.get_postings(title, b"world").await.unwrap();
643        assert_eq!(postings.len(), 1); // One segment
644        assert_eq!(postings[0].1.doc_count(), 2); // Two docs with "world"
645
646        // Retrieve document
647        let doc = index.doc(0).await.unwrap().unwrap();
648        assert_eq!(doc.get_first(title).unwrap().as_text(), Some("Hello World"));
649    }
650
651    #[tokio::test]
652    async fn test_multiple_segments() {
653        let mut schema_builder = SchemaBuilder::default();
654        let title = schema_builder.add_text_field("title", true, true);
655        let schema = schema_builder.build();
656
657        let dir = RamDirectory::new();
658        let config = IndexConfig {
659            max_docs_per_segment: 5, // Small segments for testing
660            ..Default::default()
661        };
662
663        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
664            .await
665            .unwrap();
666
667        // Add documents in batches to create multiple segments
668        for batch in 0..3 {
669            for i in 0..5 {
670                let mut doc = Document::new();
671                doc.add_text(title, format!("Document {} batch {}", i, batch));
672                writer.add_document(doc).await.unwrap();
673            }
674            writer.commit().await.unwrap();
675        }
676
677        // Open and check
678        let index = Index::open(dir, config).await.unwrap();
679        assert_eq!(index.num_docs(), 15);
680        assert_eq!(index.segment_readers().len(), 3);
681    }
682
683    #[tokio::test]
684    async fn test_segment_merge() {
685        let mut schema_builder = SchemaBuilder::default();
686        let title = schema_builder.add_text_field("title", true, true);
687        let schema = schema_builder.build();
688
689        let dir = RamDirectory::new();
690        let config = IndexConfig {
691            max_docs_per_segment: 3,
692            ..Default::default()
693        };
694
695        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
696            .await
697            .unwrap();
698
699        // Create multiple segments
700        for i in 0..9 {
701            let mut doc = Document::new();
702            doc.add_text(title, format!("Document {}", i));
703            writer.add_document(doc).await.unwrap();
704        }
705        writer.commit().await.unwrap();
706
707        // Should have 3 segments
708        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
709        assert_eq!(index.segment_readers().len(), 3);
710
711        // Force merge
712        let writer = IndexWriter::open(dir.clone(), config.clone())
713            .await
714            .unwrap();
715        writer.force_merge().await.unwrap();
716
717        // Should have 1 segment now
718        let index = Index::open(dir, config).await.unwrap();
719        assert_eq!(index.segment_readers().len(), 1);
720        assert_eq!(index.num_docs(), 9);
721
722        // Verify all documents accessible
723        for i in 0..9 {
724            let doc = index.doc(i).await.unwrap().unwrap();
725            assert_eq!(
726                doc.get_first(title).unwrap().as_text(),
727                Some(format!("Document {}", i).as_str())
728            );
729        }
730    }
731
732    #[tokio::test]
733    async fn test_match_query() {
734        let mut schema_builder = SchemaBuilder::default();
735        let title = schema_builder.add_text_field("title", true, true);
736        let body = schema_builder.add_text_field("body", true, true);
737        let schema = schema_builder.build();
738
739        let dir = RamDirectory::new();
740        let config = IndexConfig::default();
741
742        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
743            .await
744            .unwrap();
745
746        let mut doc1 = Document::new();
747        doc1.add_text(title, "rust programming");
748        doc1.add_text(body, "Learn rust language");
749        writer.add_document(doc1).await.unwrap();
750
751        let mut doc2 = Document::new();
752        doc2.add_text(title, "python programming");
753        doc2.add_text(body, "Learn python language");
754        writer.add_document(doc2).await.unwrap();
755
756        writer.commit().await.unwrap();
757
758        let index = Index::open(dir, config).await.unwrap();
759
760        // Test match query with multiple default fields
761        let results = index.query("rust", 10).await.unwrap();
762        assert_eq!(results.hits.len(), 1);
763
764        // Test match query with multiple tokens
765        let results = index.query("rust programming", 10).await.unwrap();
766        assert!(!results.hits.is_empty());
767
768        // Verify hit has address (segment_id + doc_id)
769        let hit = &results.hits[0];
770        assert!(!hit.address.segment_id.is_empty(), "Should have segment_id");
771
772        // Verify document retrieval by address
773        let doc = index.get_document(&hit.address).await.unwrap().unwrap();
774        assert!(
775            !doc.field_values().is_empty(),
776            "Doc should have field values"
777        );
778
779        // Also verify doc retrieval directly by global doc_id
780        let doc = index.doc(0).await.unwrap().unwrap();
781        assert!(
782            !doc.field_values().is_empty(),
783            "Doc should have field values"
784        );
785    }
786
787    #[tokio::test]
788    async fn test_slice_cache_warmup_and_load() {
789        use crate::directories::SliceCachingDirectory;
790
791        let mut schema_builder = SchemaBuilder::default();
792        let title = schema_builder.add_text_field("title", true, true);
793        let body = schema_builder.add_text_field("body", true, true);
794        let schema = schema_builder.build();
795
796        let dir = RamDirectory::new();
797        let config = IndexConfig::default();
798
799        // Create index with some documents
800        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
801            .await
802            .unwrap();
803
804        for i in 0..10 {
805            let mut doc = Document::new();
806            doc.add_text(title, format!("Document {} about rust", i));
807            doc.add_text(body, format!("This is body text number {}", i));
808            writer.add_document(doc).await.unwrap();
809        }
810        writer.commit().await.unwrap();
811
812        // Open with slice caching and perform some operations to warm up cache
813        let caching_dir = SliceCachingDirectory::new(dir.clone(), 1024 * 1024);
814        let index = Index::open(caching_dir, config.clone()).await.unwrap();
815
816        // Perform a search to warm up the cache
817        let results = index.query("rust", 10).await.unwrap();
818        assert!(!results.hits.is_empty());
819
820        // Check cache stats - should have cached some data
821        let stats = index.slice_cache_stats();
822        assert!(stats.total_bytes > 0, "Cache should have data after search");
823
824        // Save the cache
825        index.save_slice_cache().await.unwrap();
826
827        // Verify cache file was written
828        assert!(dir.exists(Path::new(SLICE_CACHE_FILENAME)).await.unwrap());
829
830        // Now open with cache loading
831        let index2 = Index::open_with_cache(dir.clone(), config.clone(), 1024 * 1024)
832            .await
833            .unwrap();
834
835        // Cache should be prefilled
836        let stats2 = index2.slice_cache_stats();
837        assert!(
838            stats2.total_bytes > 0,
839            "Cache should be prefilled from file"
840        );
841
842        // Search should still work
843        let results2 = index2.query("rust", 10).await.unwrap();
844        assert_eq!(results.hits.len(), results2.hits.len());
845    }
846
847    #[tokio::test]
848    async fn test_multivalue_field_indexing_and_search() {
849        let mut schema_builder = SchemaBuilder::default();
850        let uris = schema_builder.add_text_field("uris", true, true);
851        let title = schema_builder.add_text_field("title", true, true);
852        let schema = schema_builder.build();
853
854        let dir = RamDirectory::new();
855        let config = IndexConfig::default();
856
857        // Create index and add document with multi-value field
858        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
859            .await
860            .unwrap();
861
862        let mut doc = Document::new();
863        doc.add_text(uris, "one");
864        doc.add_text(uris, "two");
865        doc.add_text(title, "Test Document");
866        writer.add_document(doc).await.unwrap();
867
868        // Add another document with different uris
869        let mut doc2 = Document::new();
870        doc2.add_text(uris, "three");
871        doc2.add_text(title, "Another Document");
872        writer.add_document(doc2).await.unwrap();
873
874        writer.commit().await.unwrap();
875
876        // Open for reading
877        let index = Index::open(dir, config).await.unwrap();
878        assert_eq!(index.num_docs(), 2);
879
880        // Verify document retrieval preserves all values
881        let doc = index.doc(0).await.unwrap().unwrap();
882        let all_uris: Vec<_> = doc.get_all(uris).collect();
883        assert_eq!(all_uris.len(), 2, "Should have 2 uris values");
884        assert_eq!(all_uris[0].as_text(), Some("one"));
885        assert_eq!(all_uris[1].as_text(), Some("two"));
886
887        // Verify to_json returns array for multi-value field
888        let json = doc.to_json(index.schema());
889        let uris_json = json.get("uris").unwrap();
890        assert!(uris_json.is_array(), "Multi-value field should be an array");
891        let uris_arr = uris_json.as_array().unwrap();
892        assert_eq!(uris_arr.len(), 2);
893        assert_eq!(uris_arr[0].as_str(), Some("one"));
894        assert_eq!(uris_arr[1].as_str(), Some("two"));
895
896        // Verify both values are searchable
897        let results = index.query("uris:one", 10).await.unwrap();
898        assert_eq!(results.hits.len(), 1, "Should find doc with 'one'");
899        assert_eq!(results.hits[0].address.doc_id, 0);
900
901        let results = index.query("uris:two", 10).await.unwrap();
902        assert_eq!(results.hits.len(), 1, "Should find doc with 'two'");
903        assert_eq!(results.hits[0].address.doc_id, 0);
904
905        let results = index.query("uris:three", 10).await.unwrap();
906        assert_eq!(results.hits.len(), 1, "Should find doc with 'three'");
907        assert_eq!(results.hits[0].address.doc_id, 1);
908
909        // Verify searching for non-existent value returns no results
910        let results = index.query("uris:nonexistent", 10).await.unwrap();
911        assert_eq!(results.hits.len(), 0, "Should not find non-existent value");
912    }
913
914    /// Comprehensive test for WAND optimization in BooleanQuery OR queries
915    ///
916    /// This test verifies that:
917    /// 1. BooleanQuery with multiple SHOULD term queries uses WAND automatically
918    /// 2. Search results are correct regardless of WAND optimization
919    /// 3. Scores are reasonable for matching documents
920    #[tokio::test]
921    async fn test_wand_optimization_for_or_queries() {
922        use crate::query::{BooleanQuery, TermQuery};
923
924        let mut schema_builder = SchemaBuilder::default();
925        let content = schema_builder.add_text_field("content", true, true);
926        let schema = schema_builder.build();
927
928        let dir = RamDirectory::new();
929        let config = IndexConfig::default();
930
931        // Create index with documents containing various terms
932        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
933            .await
934            .unwrap();
935
936        // Doc 0: contains "rust" and "programming"
937        let mut doc = Document::new();
938        doc.add_text(content, "rust programming language is fast");
939        writer.add_document(doc).await.unwrap();
940
941        // Doc 1: contains "rust" only
942        let mut doc = Document::new();
943        doc.add_text(content, "rust is a systems language");
944        writer.add_document(doc).await.unwrap();
945
946        // Doc 2: contains "programming" only
947        let mut doc = Document::new();
948        doc.add_text(content, "programming is fun");
949        writer.add_document(doc).await.unwrap();
950
951        // Doc 3: contains "python" (neither rust nor programming)
952        let mut doc = Document::new();
953        doc.add_text(content, "python is easy to learn");
954        writer.add_document(doc).await.unwrap();
955
956        // Doc 4: contains both "rust" and "programming" multiple times
957        let mut doc = Document::new();
958        doc.add_text(content, "rust rust programming programming systems");
959        writer.add_document(doc).await.unwrap();
960
961        writer.commit().await.unwrap();
962
963        // Open for reading
964        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
965
966        // Test 1: Pure OR query with multiple terms (should use WAND automatically)
967        let or_query = BooleanQuery::new()
968            .should(TermQuery::text(content, "rust"))
969            .should(TermQuery::text(content, "programming"));
970
971        let results = index.search(&or_query, 10).await.unwrap();
972
973        // Should find docs 0, 1, 2, 4 (all that contain "rust" OR "programming")
974        assert_eq!(results.hits.len(), 4, "Should find exactly 4 documents");
975
976        let doc_ids: Vec<u32> = results.hits.iter().map(|h| h.address.doc_id).collect();
977        assert!(doc_ids.contains(&0), "Should find doc 0");
978        assert!(doc_ids.contains(&1), "Should find doc 1");
979        assert!(doc_ids.contains(&2), "Should find doc 2");
980        assert!(doc_ids.contains(&4), "Should find doc 4");
981        assert!(
982            !doc_ids.contains(&3),
983            "Should NOT find doc 3 (only has 'python')"
984        );
985
986        // Test 2: Single term query (should NOT use WAND, but still work)
987        let single_query = BooleanQuery::new().should(TermQuery::text(content, "rust"));
988
989        let results = index.search(&single_query, 10).await.unwrap();
990        assert_eq!(results.hits.len(), 3, "Should find 3 documents with 'rust'");
991
992        // Test 3: Query with MUST (should NOT use WAND)
993        let must_query = BooleanQuery::new()
994            .must(TermQuery::text(content, "rust"))
995            .should(TermQuery::text(content, "programming"));
996
997        let results = index.search(&must_query, 10).await.unwrap();
998        // Must have "rust", optionally "programming"
999        assert_eq!(results.hits.len(), 3, "Should find 3 documents with 'rust'");
1000
1001        // Test 4: Query with MUST_NOT (should NOT use WAND)
1002        let must_not_query = BooleanQuery::new()
1003            .should(TermQuery::text(content, "rust"))
1004            .should(TermQuery::text(content, "programming"))
1005            .must_not(TermQuery::text(content, "systems"));
1006
1007        let results = index.search(&must_not_query, 10).await.unwrap();
1008        // Should exclude docs with "systems" (doc 1 and 4)
1009        let doc_ids: Vec<u32> = results.hits.iter().map(|h| h.address.doc_id).collect();
1010        assert!(
1011            !doc_ids.contains(&1),
1012            "Should NOT find doc 1 (has 'systems')"
1013        );
1014        assert!(
1015            !doc_ids.contains(&4),
1016            "Should NOT find doc 4 (has 'systems')"
1017        );
1018
1019        // Test 5: Verify top-k limit works correctly with WAND
1020        let or_query = BooleanQuery::new()
1021            .should(TermQuery::text(content, "rust"))
1022            .should(TermQuery::text(content, "programming"));
1023
1024        let results = index.search(&or_query, 2).await.unwrap();
1025        assert_eq!(results.hits.len(), 2, "Should return only top 2 results");
1026
1027        // Top results should be docs that match both terms (higher scores)
1028        // Doc 0 and 4 contain both "rust" and "programming"
1029    }
1030
1031    /// Test that WAND optimization produces same results as non-WAND for correctness
1032    #[tokio::test]
1033    async fn test_wand_results_match_standard_boolean() {
1034        use crate::query::{BooleanQuery, TermQuery, WandOrQuery};
1035
1036        let mut schema_builder = SchemaBuilder::default();
1037        let content = schema_builder.add_text_field("content", true, true);
1038        let schema = schema_builder.build();
1039
1040        let dir = RamDirectory::new();
1041        let config = IndexConfig::default();
1042
1043        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1044            .await
1045            .unwrap();
1046
1047        // Add several documents
1048        for i in 0..10 {
1049            let mut doc = Document::new();
1050            let text = match i % 4 {
1051                0 => "apple banana cherry",
1052                1 => "apple orange",
1053                2 => "banana grape",
1054                _ => "cherry date",
1055            };
1056            doc.add_text(content, text);
1057            writer.add_document(doc).await.unwrap();
1058        }
1059
1060        writer.commit().await.unwrap();
1061        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1062
1063        // Compare explicit WandOrQuery with auto-optimized BooleanQuery
1064        let wand_query = WandOrQuery::new(content).term("apple").term("banana");
1065
1066        let bool_query = BooleanQuery::new()
1067            .should(TermQuery::text(content, "apple"))
1068            .should(TermQuery::text(content, "banana"));
1069
1070        let wand_results = index.search(&wand_query, 10).await.unwrap();
1071        let bool_results = index.search(&bool_query, 10).await.unwrap();
1072
1073        // Both should find the same documents
1074        assert_eq!(
1075            wand_results.hits.len(),
1076            bool_results.hits.len(),
1077            "WAND and Boolean should find same number of docs"
1078        );
1079
1080        let wand_docs: std::collections::HashSet<u32> =
1081            wand_results.hits.iter().map(|h| h.address.doc_id).collect();
1082        let bool_docs: std::collections::HashSet<u32> =
1083            bool_results.hits.iter().map(|h| h.address.doc_id).collect();
1084
1085        assert_eq!(
1086            wand_docs, bool_docs,
1087            "WAND and Boolean should find same documents"
1088        );
1089    }
1090}