hermes_core/index/
mod.rs

1//! Index - multi-segment async search index
2//!
3//! Components:
4//! - Index: main entry point for searching
5//! - IndexWriter: for adding documents and committing segments (native only)
6//! - Supports multiple segments with merge
7
8use std::path::Path;
9use std::sync::Arc;
10
11use parking_lot::RwLock;
12
13use crate::DocId;
14use crate::directories::{Directory, SliceCachingDirectory};
15use crate::dsl::{Document, Field, Schema};
16use crate::error::{Error, Result};
17use crate::segment::{SegmentId, SegmentReader};
18use crate::structures::BlockPostingList;
19
20#[cfg(feature = "native")]
21mod writer;
22#[cfg(feature = "native")]
23pub use writer::IndexWriter;
24
25#[cfg(feature = "native")]
26mod helpers;
27#[cfg(feature = "native")]
28pub use helpers::{
29    IndexingStats, SchemaConfig, SchemaFieldConfig, create_index_at_path, create_index_from_sdl,
30    index_documents_from_reader, index_json_document, parse_schema,
31};
32
33/// Default file name for the slice cache
34pub const SLICE_CACHE_FILENAME: &str = "index.slicecache";
35
36/// Index configuration
37#[derive(Debug, Clone)]
38pub struct IndexConfig {
39    /// Number of threads for CPU-intensive tasks (search parallelism)
40    pub num_threads: usize,
41    /// Number of parallel segment builders (documents distributed round-robin)
42    pub num_indexing_threads: usize,
43    /// Number of threads for parallel block compression within each segment
44    pub num_compression_threads: usize,
45    /// Block cache size for term dictionary per segment
46    pub term_cache_blocks: usize,
47    /// Block cache size for document store per segment
48    pub store_cache_blocks: usize,
49    /// Max documents per segment before auto-commit
50    pub max_docs_per_segment: u32,
51    /// Merge policy for background segment merging
52    pub merge_policy: Box<dyn crate::merge::MergePolicy>,
53    /// Index optimization mode (adaptive, size-optimized, performance-optimized)
54    pub optimization: crate::structures::IndexOptimization,
55}
56
57impl Default for IndexConfig {
58    fn default() -> Self {
59        #[cfg(feature = "native")]
60        let cpus = num_cpus::get().max(1);
61        #[cfg(not(feature = "native"))]
62        let cpus = 1;
63
64        Self {
65            num_threads: cpus,
66            num_indexing_threads: 1,
67            num_compression_threads: cpus,
68            term_cache_blocks: 256,
69            store_cache_blocks: 32,
70            max_docs_per_segment: 100_000,
71            merge_policy: Box::new(crate::merge::TieredMergePolicy::default()),
72            optimization: crate::structures::IndexOptimization::default(),
73        }
74    }
75}
76
77/// Multi-segment async Index
78///
79/// The main entry point for searching. Manages multiple segments
80/// and provides unified search across all of them.
81pub struct Index<D: Directory> {
82    directory: Arc<D>,
83    schema: Arc<Schema>,
84    config: IndexConfig,
85    segments: RwLock<Vec<Arc<SegmentReader>>>,
86    default_fields: Vec<crate::Field>,
87    tokenizers: Arc<crate::tokenizer::TokenizerRegistry>,
88    /// Cached global statistics for cross-segment IDF computation
89    global_stats: crate::query::GlobalStatsCache,
90    #[cfg(feature = "native")]
91    thread_pool: Arc<rayon::ThreadPool>,
92}
93
94impl<D: Directory> Index<D> {
95    /// Open an existing index from a directory
96    pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
97        let directory = Arc::new(directory);
98
99        // Read schema
100        let schema_slice = directory.open_read(Path::new("schema.json")).await?;
101        let schema_bytes = schema_slice.read_bytes().await?;
102        let schema: Schema = serde_json::from_slice(schema_bytes.as_slice())
103            .map_err(|e| Error::Serialization(e.to_string()))?;
104        let schema = Arc::new(schema);
105
106        // Read segment list
107        let segments = Self::load_segments(&directory, &schema, &config).await?;
108
109        #[cfg(feature = "native")]
110        let thread_pool = {
111            let pool = rayon::ThreadPoolBuilder::new()
112                .num_threads(config.num_threads)
113                .build()
114                .map_err(|e| Error::Io(std::io::Error::other(e)))?;
115            Arc::new(pool)
116        };
117
118        // Use schema's default_fields if specified, otherwise fall back to all indexed text fields
119        let default_fields: Vec<crate::Field> = if !schema.default_fields().is_empty() {
120            schema.default_fields().to_vec()
121        } else {
122            schema
123                .fields()
124                .filter(|(_, entry)| {
125                    entry.indexed && entry.field_type == crate::dsl::FieldType::Text
126                })
127                .map(|(field, _)| field)
128                .collect()
129        };
130
131        Ok(Self {
132            directory,
133            schema,
134            config,
135            segments: RwLock::new(segments),
136            default_fields,
137            tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
138            global_stats: crate::query::GlobalStatsCache::new(),
139            #[cfg(feature = "native")]
140            thread_pool,
141        })
142    }
143
144    async fn load_segments(
145        directory: &Arc<D>,
146        schema: &Arc<Schema>,
147        config: &IndexConfig,
148    ) -> Result<Vec<Arc<SegmentReader>>> {
149        // Read segments.json which lists all segment IDs
150        let segments_path = Path::new("segments.json");
151        if !directory.exists(segments_path).await? {
152            return Ok(Vec::new());
153        }
154
155        let segments_slice = directory.open_read(segments_path).await?;
156        let segments_bytes = segments_slice.read_bytes().await?;
157        let segment_ids: Vec<String> = serde_json::from_slice(segments_bytes.as_slice())
158            .map_err(|e| Error::Serialization(e.to_string()))?;
159
160        let mut segments = Vec::new();
161        let mut doc_id_offset = 0u32;
162
163        for id_str in segment_ids {
164            let segment_id = SegmentId::from_hex(&id_str)
165                .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
166            let reader = SegmentReader::open(
167                directory.as_ref(),
168                segment_id,
169                Arc::clone(schema),
170                doc_id_offset,
171                config.term_cache_blocks,
172            )
173            .await?;
174
175            doc_id_offset += reader.meta().num_docs;
176            segments.push(Arc::new(reader));
177        }
178
179        Ok(segments)
180    }
181
182    /// Get the schema
183    pub fn schema(&self) -> &Schema {
184        &self.schema
185    }
186
187    /// Get a reference to the underlying directory
188    pub fn directory(&self) -> &D {
189        &self.directory
190    }
191
192    /// Total number of documents across all segments
193    pub fn num_docs(&self) -> u32 {
194        self.segments.read().iter().map(|s| s.num_docs()).sum()
195    }
196
197    /// Get a document by global doc_id (async)
198    pub async fn doc(&self, doc_id: DocId) -> Result<Option<Document>> {
199        let segments = self.segments.read().clone();
200
201        let mut offset = 0u32;
202        for segment in segments.iter() {
203            let segment_docs = segment.meta().num_docs;
204            if doc_id < offset + segment_docs {
205                let local_doc_id = doc_id - offset;
206                return segment.doc(local_doc_id).await;
207            }
208            offset += segment_docs;
209        }
210
211        Ok(None)
212    }
213
214    /// Get posting lists for a term across all segments (async)
215    pub async fn get_postings(
216        &self,
217        field: Field,
218        term: &[u8],
219    ) -> Result<Vec<(Arc<SegmentReader>, BlockPostingList)>> {
220        let segments = self.segments.read().clone();
221        let mut results = Vec::new();
222
223        for segment in segments.iter() {
224            if let Some(postings) = segment.get_postings(field, term).await? {
225                results.push((Arc::clone(segment), postings));
226            }
227        }
228
229        Ok(results)
230    }
231
232    /// Execute CPU-intensive work on thread pool (native only)
233    #[cfg(feature = "native")]
234    pub async fn spawn_blocking<F, R>(&self, f: F) -> R
235    where
236        F: FnOnce() -> R + Send + 'static,
237        R: Send + 'static,
238    {
239        let (tx, rx) = tokio::sync::oneshot::channel();
240        self.thread_pool.spawn(move || {
241            let result = f();
242            let _ = tx.send(result);
243        });
244        rx.await.expect("Thread pool task panicked")
245    }
246
247    /// Get segment readers for query execution
248    pub fn segment_readers(&self) -> Vec<Arc<SegmentReader>> {
249        self.segments.read().clone()
250    }
251
252    /// Reload segments from directory (after new segments added)
253    pub async fn reload(&self) -> Result<()> {
254        let new_segments = Self::load_segments(&self.directory, &self.schema, &self.config).await?;
255        *self.segments.write() = new_segments;
256        // Invalidate global stats cache since segments changed
257        self.global_stats.invalidate();
258        Ok(())
259    }
260
261    /// Get global statistics for cross-segment IDF computation (sync, basic stats only)
262    ///
263    /// Returns cached stats if available. For full stats including term frequencies,
264    /// call `build_global_stats().await` first.
265    ///
266    /// This sync version only includes:
267    /// - Total docs
268    /// - Sparse vector dimension document frequencies
269    /// - Average field lengths
270    pub fn global_stats(&self) -> Option<Arc<crate::query::GlobalStats>> {
271        self.global_stats.get()
272    }
273
274    /// Build and cache global statistics (async, includes term frequencies)
275    ///
276    /// This iterates term dictionaries across all segments to compute
277    /// accurate cross-segment IDF values for full-text queries.
278    ///
279    /// Call this once after opening the index or after reload().
280    pub async fn build_global_stats(&self) -> Result<Arc<crate::query::GlobalStats>> {
281        // Return cached if available
282        if let Some(stats) = self.global_stats.get() {
283            return Ok(stats);
284        }
285
286        let segments = self.segments.read().clone();
287        let schema = &self.schema;
288        let mut builder = crate::query::GlobalStatsBuilder::new();
289
290        // Track field length sums for computing global avg
291        let mut field_len_sums: rustc_hash::FxHashMap<u32, (u64, u64)> =
292            rustc_hash::FxHashMap::default();
293
294        for segment in &segments {
295            let num_docs = segment.num_docs() as u64;
296            builder.total_docs += num_docs;
297
298            // Aggregate sparse vector statistics
299            for (&field_id, sparse_index) in segment.sparse_indexes() {
300                for (dim_id, posting_list) in sparse_index.postings.iter().enumerate() {
301                    if let Some(pl) = posting_list {
302                        builder.add_sparse_df(
303                            crate::dsl::Field(field_id),
304                            dim_id as u32,
305                            pl.doc_count() as u64,
306                        );
307                    }
308                }
309            }
310
311            // Aggregate text field average lengths
312            for (field, entry) in schema.fields() {
313                if entry.indexed && entry.field_type == crate::dsl::FieldType::Text {
314                    let avg_len = segment.avg_field_len(field);
315                    let (sum, count) = field_len_sums.entry(field.0).or_insert((0, 0));
316                    *sum += (avg_len * num_docs as f32) as u64;
317                    *count += num_docs;
318                }
319            }
320
321            // Iterate term dictionary to get term document frequencies
322            for (field, term, doc_freq) in segment.all_terms_with_stats().await? {
323                builder.add_text_df(field, term, doc_freq as u64);
324            }
325        }
326
327        // Set global average field lengths
328        for (field_id, (sum, count)) in field_len_sums {
329            if count > 0 {
330                let global_avg = sum as f32 / count as f32;
331                builder.set_avg_field_len(crate::dsl::Field(field_id), global_avg);
332            }
333        }
334
335        let generation = self.global_stats.generation();
336        let stats = builder.build(generation);
337        self.global_stats.set_stats(stats);
338
339        Ok(self.global_stats.get().unwrap())
340    }
341
342    /// Search and return results with document addresses (no document content)
343    ///
344    /// This is the primary search method. Use `get_document` to fetch document content.
345    pub async fn search(
346        &self,
347        query: &dyn crate::query::Query,
348        limit: usize,
349    ) -> Result<crate::query::SearchResponse> {
350        self.search_offset(query, limit, 0).await
351    }
352
353    /// Search with offset for pagination
354    pub async fn search_offset(
355        &self,
356        query: &dyn crate::query::Query,
357        limit: usize,
358        offset: usize,
359    ) -> Result<crate::query::SearchResponse> {
360        let segments = self.segments.read().clone();
361        let mut all_results: Vec<(u128, crate::query::SearchResult)> = Vec::new();
362
363        // Fetch enough results to cover offset + limit
364        let fetch_limit = offset + limit;
365        for segment in &segments {
366            let segment_id = segment.meta().id;
367            let results =
368                crate::query::search_segment(segment.as_ref(), query, fetch_limit).await?;
369            for result in results {
370                all_results.push((segment_id, result));
371            }
372        }
373
374        // Sort by score descending
375        all_results.sort_by(|a, b| {
376            b.1.score
377                .partial_cmp(&a.1.score)
378                .unwrap_or(std::cmp::Ordering::Equal)
379        });
380
381        // Total hits before pagination
382        let total_hits = all_results.len() as u32;
383
384        // Apply offset and limit
385        let hits: Vec<crate::query::SearchHit> = all_results
386            .into_iter()
387            .skip(offset)
388            .take(limit)
389            .map(|(segment_id, result)| crate::query::SearchHit {
390                address: crate::query::DocAddress::new(segment_id, result.doc_id),
391                score: result.score,
392            })
393            .collect();
394
395        Ok(crate::query::SearchResponse { hits, total_hits })
396    }
397
398    /// Get a document by its unique address (segment_id + local doc_id)
399    pub async fn get_document(
400        &self,
401        address: &crate::query::DocAddress,
402    ) -> Result<Option<Document>> {
403        let segment_id = address
404            .segment_id_u128()
405            .ok_or_else(|| Error::Query(format!("Invalid segment ID: {}", address.segment_id)))?;
406
407        let segments = self.segments.read().clone();
408        for segment in &segments {
409            if segment.meta().id == segment_id {
410                return segment.doc(address.doc_id).await;
411            }
412        }
413
414        Ok(None)
415    }
416
417    /// Get the default fields for this index
418    pub fn default_fields(&self) -> &[crate::Field] {
419        &self.default_fields
420    }
421
422    /// Set the default fields for query parsing
423    pub fn set_default_fields(&mut self, fields: Vec<crate::Field>) {
424        self.default_fields = fields;
425    }
426
427    /// Get the tokenizer registry
428    pub fn tokenizers(&self) -> &Arc<crate::tokenizer::TokenizerRegistry> {
429        &self.tokenizers
430    }
431
432    /// Create a query parser for this index
433    ///
434    /// If the schema contains query router rules, they will be used to route
435    /// queries to specific fields based on regex patterns.
436    pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
437        // Check if schema has query routers
438        let query_routers = self.schema.query_routers();
439        if !query_routers.is_empty() {
440            // Try to create a router from the schema's rules
441            if let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers) {
442                return crate::dsl::QueryLanguageParser::with_router(
443                    Arc::clone(&self.schema),
444                    self.default_fields.clone(),
445                    Arc::clone(&self.tokenizers),
446                    router,
447                );
448            }
449        }
450
451        // Fall back to parser without router
452        crate::dsl::QueryLanguageParser::new(
453            Arc::clone(&self.schema),
454            self.default_fields.clone(),
455            Arc::clone(&self.tokenizers),
456        )
457    }
458
459    /// Parse and search using a query string
460    ///
461    /// Accepts both query language syntax (field:term, AND, OR, NOT, grouping)
462    /// and simple text (tokenized and searched across default fields).
463    /// Returns document addresses (segment_id + doc_id) without document content.
464    pub async fn query(
465        &self,
466        query_str: &str,
467        limit: usize,
468    ) -> Result<crate::query::SearchResponse> {
469        self.query_offset(query_str, limit, 0).await
470    }
471
472    /// Query with offset for pagination
473    pub async fn query_offset(
474        &self,
475        query_str: &str,
476        limit: usize,
477        offset: usize,
478    ) -> Result<crate::query::SearchResponse> {
479        let parser = self.query_parser();
480        let query = parser.parse(query_str).map_err(Error::Query)?;
481        self.search_offset(query.as_ref(), limit, offset).await
482    }
483}
484
485/// Methods for opening index with slice caching
486impl<D: Directory> Index<SliceCachingDirectory<D>> {
487    /// Open an index with slice caching, automatically loading the cache file if present
488    ///
489    /// This wraps the directory in a SliceCachingDirectory and attempts to load
490    /// any existing slice cache file to prefill the cache with hot data.
491    pub async fn open_with_cache(
492        directory: D,
493        config: IndexConfig,
494        cache_max_bytes: usize,
495    ) -> Result<Self> {
496        let caching_dir = SliceCachingDirectory::new(directory, cache_max_bytes);
497
498        // Try to load existing slice cache
499        let cache_path = Path::new(SLICE_CACHE_FILENAME);
500        if let Ok(true) = caching_dir.inner().exists(cache_path).await
501            && let Ok(slice) = caching_dir.inner().open_read(cache_path).await
502            && let Ok(bytes) = slice.read_bytes().await
503        {
504            let _ = caching_dir.deserialize(bytes.as_slice());
505        }
506
507        Self::open(caching_dir, config).await
508    }
509
510    /// Serialize the current slice cache to the index directory
511    ///
512    /// This saves all cached slices to a single file that can be loaded
513    /// on subsequent index opens for faster startup.
514    #[cfg(feature = "native")]
515    pub async fn save_slice_cache(&self) -> Result<()>
516    where
517        D: crate::directories::DirectoryWriter,
518    {
519        let cache_data = self.directory.serialize();
520        let cache_path = Path::new(SLICE_CACHE_FILENAME);
521        self.directory
522            .inner()
523            .write(cache_path, &cache_data)
524            .await?;
525        Ok(())
526    }
527
528    /// Get slice cache statistics
529    pub fn slice_cache_stats(&self) -> crate::directories::SliceCacheStats {
530        self.directory.stats()
531    }
532}
533
534/// Warm up the slice cache by opening an index and performing typical read operations
535///
536/// This function opens an index using a SliceCachingDirectory, performs operations
537/// that would typically be done during search (reading term dictionaries, posting lists),
538/// and then serializes the cache to a file for future use.
539///
540/// The resulting cache file contains all the "hot" data that was read during warmup,
541/// allowing subsequent index opens to prefill the cache and avoid cold-start latency.
542#[cfg(feature = "native")]
543pub async fn warmup_and_save_slice_cache<D: crate::directories::DirectoryWriter>(
544    directory: D,
545    config: IndexConfig,
546    cache_max_bytes: usize,
547) -> Result<()> {
548    let caching_dir = SliceCachingDirectory::new(directory, cache_max_bytes);
549    let index = Index::open(caching_dir, config).await?;
550
551    // Warm up by loading segment metadata and term dictionaries
552    // The SegmentReader::open already reads essential metadata
553    // Additional warmup can be done by iterating terms or doing sample queries
554
555    // Save the cache
556    index.save_slice_cache().await?;
557
558    Ok(())
559}
560
561#[cfg(feature = "native")]
562impl<D: Directory> Clone for Index<D> {
563    fn clone(&self) -> Self {
564        Self {
565            directory: Arc::clone(&self.directory),
566            schema: Arc::clone(&self.schema),
567            config: self.config.clone(),
568            segments: RwLock::new(self.segments.read().clone()),
569            default_fields: self.default_fields.clone(),
570            tokenizers: Arc::clone(&self.tokenizers),
571            global_stats: crate::query::GlobalStatsCache::new(),
572            thread_pool: Arc::clone(&self.thread_pool),
573        }
574    }
575}
576
577#[cfg(test)]
578mod tests {
579    use super::*;
580    use crate::directories::RamDirectory;
581    use crate::dsl::SchemaBuilder;
582
583    #[tokio::test]
584    async fn test_index_create_and_search() {
585        let mut schema_builder = SchemaBuilder::default();
586        let title = schema_builder.add_text_field("title", true, true);
587        let body = schema_builder.add_text_field("body", true, true);
588        let schema = schema_builder.build();
589
590        let dir = RamDirectory::new();
591        let config = IndexConfig::default();
592
593        // Create index and add documents
594        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
595            .await
596            .unwrap();
597
598        let mut doc1 = Document::new();
599        doc1.add_text(title, "Hello World");
600        doc1.add_text(body, "This is the first document");
601        writer.add_document(doc1).await.unwrap();
602
603        let mut doc2 = Document::new();
604        doc2.add_text(title, "Goodbye World");
605        doc2.add_text(body, "This is the second document");
606        writer.add_document(doc2).await.unwrap();
607
608        writer.commit().await.unwrap();
609
610        // Open for reading
611        let index = Index::open(dir, config).await.unwrap();
612        assert_eq!(index.num_docs(), 2);
613
614        // Check postings
615        let postings = index.get_postings(title, b"world").await.unwrap();
616        assert_eq!(postings.len(), 1); // One segment
617        assert_eq!(postings[0].1.doc_count(), 2); // Two docs with "world"
618
619        // Retrieve document
620        let doc = index.doc(0).await.unwrap().unwrap();
621        assert_eq!(doc.get_first(title).unwrap().as_text(), Some("Hello World"));
622    }
623
624    #[tokio::test]
625    async fn test_multiple_segments() {
626        let mut schema_builder = SchemaBuilder::default();
627        let title = schema_builder.add_text_field("title", true, true);
628        let schema = schema_builder.build();
629
630        let dir = RamDirectory::new();
631        let config = IndexConfig {
632            max_docs_per_segment: 5, // Small segments for testing
633            ..Default::default()
634        };
635
636        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
637            .await
638            .unwrap();
639
640        // Add documents in batches to create multiple segments
641        for batch in 0..3 {
642            for i in 0..5 {
643                let mut doc = Document::new();
644                doc.add_text(title, format!("Document {} batch {}", i, batch));
645                writer.add_document(doc).await.unwrap();
646            }
647            writer.commit().await.unwrap();
648        }
649
650        // Open and check
651        let index = Index::open(dir, config).await.unwrap();
652        assert_eq!(index.num_docs(), 15);
653        assert_eq!(index.segment_readers().len(), 3);
654    }
655
656    #[tokio::test]
657    async fn test_segment_merge() {
658        let mut schema_builder = SchemaBuilder::default();
659        let title = schema_builder.add_text_field("title", true, true);
660        let schema = schema_builder.build();
661
662        let dir = RamDirectory::new();
663        let config = IndexConfig {
664            max_docs_per_segment: 3,
665            ..Default::default()
666        };
667
668        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
669            .await
670            .unwrap();
671
672        // Create multiple segments
673        for i in 0..9 {
674            let mut doc = Document::new();
675            doc.add_text(title, format!("Document {}", i));
676            writer.add_document(doc).await.unwrap();
677        }
678        writer.commit().await.unwrap();
679
680        // Should have 3 segments
681        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
682        assert_eq!(index.segment_readers().len(), 3);
683
684        // Force merge
685        let writer = IndexWriter::open(dir.clone(), config.clone())
686            .await
687            .unwrap();
688        writer.force_merge().await.unwrap();
689
690        // Should have 1 segment now
691        let index = Index::open(dir, config).await.unwrap();
692        assert_eq!(index.segment_readers().len(), 1);
693        assert_eq!(index.num_docs(), 9);
694
695        // Verify all documents accessible
696        for i in 0..9 {
697            let doc = index.doc(i).await.unwrap().unwrap();
698            assert_eq!(
699                doc.get_first(title).unwrap().as_text(),
700                Some(format!("Document {}", i).as_str())
701            );
702        }
703    }
704
705    #[tokio::test]
706    async fn test_match_query() {
707        let mut schema_builder = SchemaBuilder::default();
708        let title = schema_builder.add_text_field("title", true, true);
709        let body = schema_builder.add_text_field("body", true, true);
710        let schema = schema_builder.build();
711
712        let dir = RamDirectory::new();
713        let config = IndexConfig::default();
714
715        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
716            .await
717            .unwrap();
718
719        let mut doc1 = Document::new();
720        doc1.add_text(title, "rust programming");
721        doc1.add_text(body, "Learn rust language");
722        writer.add_document(doc1).await.unwrap();
723
724        let mut doc2 = Document::new();
725        doc2.add_text(title, "python programming");
726        doc2.add_text(body, "Learn python language");
727        writer.add_document(doc2).await.unwrap();
728
729        writer.commit().await.unwrap();
730
731        let index = Index::open(dir, config).await.unwrap();
732
733        // Test match query with multiple default fields
734        let results = index.query("rust", 10).await.unwrap();
735        assert_eq!(results.hits.len(), 1);
736
737        // Test match query with multiple tokens
738        let results = index.query("rust programming", 10).await.unwrap();
739        assert!(!results.hits.is_empty());
740
741        // Verify hit has address (segment_id + doc_id)
742        let hit = &results.hits[0];
743        assert!(!hit.address.segment_id.is_empty(), "Should have segment_id");
744
745        // Verify document retrieval by address
746        let doc = index.get_document(&hit.address).await.unwrap().unwrap();
747        assert!(
748            !doc.field_values().is_empty(),
749            "Doc should have field values"
750        );
751
752        // Also verify doc retrieval directly by global doc_id
753        let doc = index.doc(0).await.unwrap().unwrap();
754        assert!(
755            !doc.field_values().is_empty(),
756            "Doc should have field values"
757        );
758    }
759
760    #[tokio::test]
761    async fn test_slice_cache_warmup_and_load() {
762        use crate::directories::SliceCachingDirectory;
763
764        let mut schema_builder = SchemaBuilder::default();
765        let title = schema_builder.add_text_field("title", true, true);
766        let body = schema_builder.add_text_field("body", true, true);
767        let schema = schema_builder.build();
768
769        let dir = RamDirectory::new();
770        let config = IndexConfig::default();
771
772        // Create index with some documents
773        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
774            .await
775            .unwrap();
776
777        for i in 0..10 {
778            let mut doc = Document::new();
779            doc.add_text(title, format!("Document {} about rust", i));
780            doc.add_text(body, format!("This is body text number {}", i));
781            writer.add_document(doc).await.unwrap();
782        }
783        writer.commit().await.unwrap();
784
785        // Open with slice caching and perform some operations to warm up cache
786        let caching_dir = SliceCachingDirectory::new(dir.clone(), 1024 * 1024);
787        let index = Index::open(caching_dir, config.clone()).await.unwrap();
788
789        // Perform a search to warm up the cache
790        let results = index.query("rust", 10).await.unwrap();
791        assert!(!results.hits.is_empty());
792
793        // Check cache stats - should have cached some data
794        let stats = index.slice_cache_stats();
795        assert!(stats.total_bytes > 0, "Cache should have data after search");
796
797        // Save the cache
798        index.save_slice_cache().await.unwrap();
799
800        // Verify cache file was written
801        assert!(dir.exists(Path::new(SLICE_CACHE_FILENAME)).await.unwrap());
802
803        // Now open with cache loading
804        let index2 = Index::open_with_cache(dir.clone(), config.clone(), 1024 * 1024)
805            .await
806            .unwrap();
807
808        // Cache should be prefilled
809        let stats2 = index2.slice_cache_stats();
810        assert!(
811            stats2.total_bytes > 0,
812            "Cache should be prefilled from file"
813        );
814
815        // Search should still work
816        let results2 = index2.query("rust", 10).await.unwrap();
817        assert_eq!(results.hits.len(), results2.hits.len());
818    }
819
820    #[tokio::test]
821    async fn test_multivalue_field_indexing_and_search() {
822        let mut schema_builder = SchemaBuilder::default();
823        let uris = schema_builder.add_text_field("uris", true, true);
824        let title = schema_builder.add_text_field("title", true, true);
825        let schema = schema_builder.build();
826
827        let dir = RamDirectory::new();
828        let config = IndexConfig::default();
829
830        // Create index and add document with multi-value field
831        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
832            .await
833            .unwrap();
834
835        let mut doc = Document::new();
836        doc.add_text(uris, "one");
837        doc.add_text(uris, "two");
838        doc.add_text(title, "Test Document");
839        writer.add_document(doc).await.unwrap();
840
841        // Add another document with different uris
842        let mut doc2 = Document::new();
843        doc2.add_text(uris, "three");
844        doc2.add_text(title, "Another Document");
845        writer.add_document(doc2).await.unwrap();
846
847        writer.commit().await.unwrap();
848
849        // Open for reading
850        let index = Index::open(dir, config).await.unwrap();
851        assert_eq!(index.num_docs(), 2);
852
853        // Verify document retrieval preserves all values
854        let doc = index.doc(0).await.unwrap().unwrap();
855        let all_uris: Vec<_> = doc.get_all(uris).collect();
856        assert_eq!(all_uris.len(), 2, "Should have 2 uris values");
857        assert_eq!(all_uris[0].as_text(), Some("one"));
858        assert_eq!(all_uris[1].as_text(), Some("two"));
859
860        // Verify to_json returns array for multi-value field
861        let json = doc.to_json(index.schema());
862        let uris_json = json.get("uris").unwrap();
863        assert!(uris_json.is_array(), "Multi-value field should be an array");
864        let uris_arr = uris_json.as_array().unwrap();
865        assert_eq!(uris_arr.len(), 2);
866        assert_eq!(uris_arr[0].as_str(), Some("one"));
867        assert_eq!(uris_arr[1].as_str(), Some("two"));
868
869        // Verify both values are searchable
870        let results = index.query("uris:one", 10).await.unwrap();
871        assert_eq!(results.hits.len(), 1, "Should find doc with 'one'");
872        assert_eq!(results.hits[0].address.doc_id, 0);
873
874        let results = index.query("uris:two", 10).await.unwrap();
875        assert_eq!(results.hits.len(), 1, "Should find doc with 'two'");
876        assert_eq!(results.hits[0].address.doc_id, 0);
877
878        let results = index.query("uris:three", 10).await.unwrap();
879        assert_eq!(results.hits.len(), 1, "Should find doc with 'three'");
880        assert_eq!(results.hits[0].address.doc_id, 1);
881
882        // Verify searching for non-existent value returns no results
883        let results = index.query("uris:nonexistent", 10).await.unwrap();
884        assert_eq!(results.hits.len(), 0, "Should not find non-existent value");
885    }
886
887    /// Comprehensive test for WAND optimization in BooleanQuery OR queries
888    ///
889    /// This test verifies that:
890    /// 1. BooleanQuery with multiple SHOULD term queries uses WAND automatically
891    /// 2. Search results are correct regardless of WAND optimization
892    /// 3. Scores are reasonable for matching documents
893    #[tokio::test]
894    async fn test_wand_optimization_for_or_queries() {
895        use crate::query::{BooleanQuery, TermQuery};
896
897        let mut schema_builder = SchemaBuilder::default();
898        let content = schema_builder.add_text_field("content", true, true);
899        let schema = schema_builder.build();
900
901        let dir = RamDirectory::new();
902        let config = IndexConfig::default();
903
904        // Create index with documents containing various terms
905        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
906            .await
907            .unwrap();
908
909        // Doc 0: contains "rust" and "programming"
910        let mut doc = Document::new();
911        doc.add_text(content, "rust programming language is fast");
912        writer.add_document(doc).await.unwrap();
913
914        // Doc 1: contains "rust" only
915        let mut doc = Document::new();
916        doc.add_text(content, "rust is a systems language");
917        writer.add_document(doc).await.unwrap();
918
919        // Doc 2: contains "programming" only
920        let mut doc = Document::new();
921        doc.add_text(content, "programming is fun");
922        writer.add_document(doc).await.unwrap();
923
924        // Doc 3: contains "python" (neither rust nor programming)
925        let mut doc = Document::new();
926        doc.add_text(content, "python is easy to learn");
927        writer.add_document(doc).await.unwrap();
928
929        // Doc 4: contains both "rust" and "programming" multiple times
930        let mut doc = Document::new();
931        doc.add_text(content, "rust rust programming programming systems");
932        writer.add_document(doc).await.unwrap();
933
934        writer.commit().await.unwrap();
935
936        // Open for reading
937        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
938
939        // Test 1: Pure OR query with multiple terms (should use WAND automatically)
940        let or_query = BooleanQuery::new()
941            .should(TermQuery::text(content, "rust"))
942            .should(TermQuery::text(content, "programming"));
943
944        let results = index.search(&or_query, 10).await.unwrap();
945
946        // Should find docs 0, 1, 2, 4 (all that contain "rust" OR "programming")
947        assert_eq!(results.hits.len(), 4, "Should find exactly 4 documents");
948
949        let doc_ids: Vec<u32> = results.hits.iter().map(|h| h.address.doc_id).collect();
950        assert!(doc_ids.contains(&0), "Should find doc 0");
951        assert!(doc_ids.contains(&1), "Should find doc 1");
952        assert!(doc_ids.contains(&2), "Should find doc 2");
953        assert!(doc_ids.contains(&4), "Should find doc 4");
954        assert!(
955            !doc_ids.contains(&3),
956            "Should NOT find doc 3 (only has 'python')"
957        );
958
959        // Test 2: Single term query (should NOT use WAND, but still work)
960        let single_query = BooleanQuery::new().should(TermQuery::text(content, "rust"));
961
962        let results = index.search(&single_query, 10).await.unwrap();
963        assert_eq!(results.hits.len(), 3, "Should find 3 documents with 'rust'");
964
965        // Test 3: Query with MUST (should NOT use WAND)
966        let must_query = BooleanQuery::new()
967            .must(TermQuery::text(content, "rust"))
968            .should(TermQuery::text(content, "programming"));
969
970        let results = index.search(&must_query, 10).await.unwrap();
971        // Must have "rust", optionally "programming"
972        assert_eq!(results.hits.len(), 3, "Should find 3 documents with 'rust'");
973
974        // Test 4: Query with MUST_NOT (should NOT use WAND)
975        let must_not_query = BooleanQuery::new()
976            .should(TermQuery::text(content, "rust"))
977            .should(TermQuery::text(content, "programming"))
978            .must_not(TermQuery::text(content, "systems"));
979
980        let results = index.search(&must_not_query, 10).await.unwrap();
981        // Should exclude docs with "systems" (doc 1 and 4)
982        let doc_ids: Vec<u32> = results.hits.iter().map(|h| h.address.doc_id).collect();
983        assert!(
984            !doc_ids.contains(&1),
985            "Should NOT find doc 1 (has 'systems')"
986        );
987        assert!(
988            !doc_ids.contains(&4),
989            "Should NOT find doc 4 (has 'systems')"
990        );
991
992        // Test 5: Verify top-k limit works correctly with WAND
993        let or_query = BooleanQuery::new()
994            .should(TermQuery::text(content, "rust"))
995            .should(TermQuery::text(content, "programming"));
996
997        let results = index.search(&or_query, 2).await.unwrap();
998        assert_eq!(results.hits.len(), 2, "Should return only top 2 results");
999
1000        // Top results should be docs that match both terms (higher scores)
1001        // Doc 0 and 4 contain both "rust" and "programming"
1002    }
1003
1004    /// Test that WAND optimization produces same results as non-WAND for correctness
1005    #[tokio::test]
1006    async fn test_wand_results_match_standard_boolean() {
1007        use crate::query::{BooleanQuery, TermQuery, WandOrQuery};
1008
1009        let mut schema_builder = SchemaBuilder::default();
1010        let content = schema_builder.add_text_field("content", true, true);
1011        let schema = schema_builder.build();
1012
1013        let dir = RamDirectory::new();
1014        let config = IndexConfig::default();
1015
1016        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1017            .await
1018            .unwrap();
1019
1020        // Add several documents
1021        for i in 0..10 {
1022            let mut doc = Document::new();
1023            let text = match i % 4 {
1024                0 => "apple banana cherry",
1025                1 => "apple orange",
1026                2 => "banana grape",
1027                _ => "cherry date",
1028            };
1029            doc.add_text(content, text);
1030            writer.add_document(doc).await.unwrap();
1031        }
1032
1033        writer.commit().await.unwrap();
1034        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1035
1036        // Compare explicit WandOrQuery with auto-optimized BooleanQuery
1037        let wand_query = WandOrQuery::new(content).term("apple").term("banana");
1038
1039        let bool_query = BooleanQuery::new()
1040            .should(TermQuery::text(content, "apple"))
1041            .should(TermQuery::text(content, "banana"));
1042
1043        let wand_results = index.search(&wand_query, 10).await.unwrap();
1044        let bool_results = index.search(&bool_query, 10).await.unwrap();
1045
1046        // Both should find the same documents
1047        assert_eq!(
1048            wand_results.hits.len(),
1049            bool_results.hits.len(),
1050            "WAND and Boolean should find same number of docs"
1051        );
1052
1053        let wand_docs: std::collections::HashSet<u32> =
1054            wand_results.hits.iter().map(|h| h.address.doc_id).collect();
1055        let bool_docs: std::collections::HashSet<u32> =
1056            bool_results.hits.iter().map(|h| h.address.doc_id).collect();
1057
1058        assert_eq!(
1059            wand_docs, bool_docs,
1060            "WAND and Boolean should find same documents"
1061        );
1062    }
1063}