hermes_core/
index.rs

1//! Index - multi-segment async search index
2//!
3//! Components:
4//! - Index: main entry point for searching
5//! - IndexWriter: for adding documents and committing segments
6//! - Supports multiple segments with merge
7
8use std::path::Path;
9use std::sync::Arc;
10
11use parking_lot::RwLock;
12#[cfg(feature = "native")]
13use rustc_hash::FxHashMap;
14
15use crate::DocId;
16#[cfg(feature = "native")]
17use crate::directories::DirectoryWriter;
18use crate::directories::{Directory, SliceCachingDirectory};
19use crate::dsl::{Document, Field, Schema};
20use crate::error::{Error, Result};
21#[cfg(feature = "native")]
22use crate::segment::{SegmentBuilder, SegmentBuilderConfig, SegmentMerger};
23use crate::segment::{SegmentId, SegmentReader};
24use crate::structures::BlockPostingList;
25#[cfg(feature = "native")]
26use crate::tokenizer::BoxedTokenizer;
27
28#[cfg(feature = "native")]
29use tokio::sync::Mutex as AsyncMutex;
30
31/// Default file name for the slice cache
32pub const SLICE_CACHE_FILENAME: &str = "index.slicecache";
33
34/// Index configuration
35#[derive(Debug, Clone)]
36pub struct IndexConfig {
37    /// Number of threads for CPU-intensive tasks (search parallelism)
38    pub num_threads: usize,
39    /// Number of parallel segment builders (documents distributed round-robin)
40    pub num_indexing_threads: usize,
41    /// Number of threads for parallel block compression within each segment
42    pub num_compression_threads: usize,
43    /// Block cache size for term dictionary per segment
44    pub term_cache_blocks: usize,
45    /// Block cache size for document store per segment
46    pub store_cache_blocks: usize,
47    /// Max documents per segment before auto-commit
48    pub max_docs_per_segment: u32,
49    /// Min segments to trigger merge
50    pub merge_threshold: usize,
51    /// Index optimization mode (adaptive, size-optimized, performance-optimized)
52    pub optimization: crate::structures::IndexOptimization,
53}
54
55impl Default for IndexConfig {
56    fn default() -> Self {
57        #[cfg(feature = "native")]
58        let cpus = num_cpus::get().max(1);
59        #[cfg(not(feature = "native"))]
60        let cpus = 1;
61
62        Self {
63            num_threads: cpus,
64            num_indexing_threads: 1,
65            num_compression_threads: cpus,
66            term_cache_blocks: 256,
67            store_cache_blocks: 32,
68            max_docs_per_segment: 100_000,
69            merge_threshold: 5,
70            optimization: crate::structures::IndexOptimization::default(),
71        }
72    }
73}
74
75/// Multi-segment async Index
76///
77/// The main entry point for searching. Manages multiple segments
78/// and provides unified search across all of them.
79pub struct Index<D: Directory> {
80    directory: Arc<D>,
81    schema: Arc<Schema>,
82    config: IndexConfig,
83    segments: RwLock<Vec<Arc<SegmentReader>>>,
84    default_fields: Vec<crate::Field>,
85    tokenizers: Arc<crate::tokenizer::TokenizerRegistry>,
86    #[cfg(feature = "native")]
87    thread_pool: Arc<rayon::ThreadPool>,
88}
89
90impl<D: Directory> Index<D> {
91    /// Open an existing index from a directory
92    pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
93        let directory = Arc::new(directory);
94
95        // Read schema
96        let schema_slice = directory.open_read(Path::new("schema.json")).await?;
97        let schema_bytes = schema_slice.read_bytes().await?;
98        let schema: Schema = serde_json::from_slice(schema_bytes.as_slice())
99            .map_err(|e| Error::Serialization(e.to_string()))?;
100        let schema = Arc::new(schema);
101
102        // Read segment list
103        let segments = Self::load_segments(&directory, &schema, &config).await?;
104
105        #[cfg(feature = "native")]
106        let thread_pool = {
107            let pool = rayon::ThreadPoolBuilder::new()
108                .num_threads(config.num_threads)
109                .build()
110                .map_err(|e| Error::Io(std::io::Error::other(e)))?;
111            Arc::new(pool)
112        };
113
114        // Use schema's default_fields if specified, otherwise fall back to all indexed text fields
115        let default_fields: Vec<crate::Field> = if !schema.default_fields().is_empty() {
116            schema.default_fields().to_vec()
117        } else {
118            schema
119                .fields()
120                .filter(|(_, entry)| {
121                    entry.indexed && entry.field_type == crate::dsl::FieldType::Text
122                })
123                .map(|(field, _)| field)
124                .collect()
125        };
126
127        Ok(Self {
128            directory,
129            schema,
130            config,
131            segments: RwLock::new(segments),
132            default_fields,
133            tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
134            #[cfg(feature = "native")]
135            thread_pool,
136        })
137    }
138
139    async fn load_segments(
140        directory: &Arc<D>,
141        schema: &Arc<Schema>,
142        config: &IndexConfig,
143    ) -> Result<Vec<Arc<SegmentReader>>> {
144        // Read segments.json which lists all segment IDs
145        let segments_path = Path::new("segments.json");
146        if !directory.exists(segments_path).await? {
147            return Ok(Vec::new());
148        }
149
150        let segments_slice = directory.open_read(segments_path).await?;
151        let segments_bytes = segments_slice.read_bytes().await?;
152        let segment_ids: Vec<String> = serde_json::from_slice(segments_bytes.as_slice())
153            .map_err(|e| Error::Serialization(e.to_string()))?;
154
155        let mut segments = Vec::new();
156        let mut doc_id_offset = 0u32;
157
158        for id_str in segment_ids {
159            let segment_id = SegmentId::from_hex(&id_str)
160                .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
161            let reader = SegmentReader::open(
162                directory.as_ref(),
163                segment_id,
164                Arc::clone(schema),
165                doc_id_offset,
166                config.term_cache_blocks,
167            )
168            .await?;
169
170            doc_id_offset += reader.meta().num_docs;
171            segments.push(Arc::new(reader));
172        }
173
174        Ok(segments)
175    }
176
177    /// Get the schema
178    pub fn schema(&self) -> &Schema {
179        &self.schema
180    }
181
182    /// Get a reference to the underlying directory
183    pub fn directory(&self) -> &D {
184        &self.directory
185    }
186
187    /// Total number of documents across all segments
188    pub fn num_docs(&self) -> u32 {
189        self.segments.read().iter().map(|s| s.num_docs()).sum()
190    }
191
192    /// Get a document by global doc_id (async)
193    pub async fn doc(&self, doc_id: DocId) -> Result<Option<Document>> {
194        let segments = self.segments.read().clone();
195
196        let mut offset = 0u32;
197        for segment in segments.iter() {
198            let segment_docs = segment.meta().num_docs;
199            if doc_id < offset + segment_docs {
200                let local_doc_id = doc_id - offset;
201                return segment.doc(local_doc_id).await;
202            }
203            offset += segment_docs;
204        }
205
206        Ok(None)
207    }
208
209    /// Get posting lists for a term across all segments (async)
210    pub async fn get_postings(
211        &self,
212        field: Field,
213        term: &[u8],
214    ) -> Result<Vec<(Arc<SegmentReader>, BlockPostingList)>> {
215        let segments = self.segments.read().clone();
216        let mut results = Vec::new();
217
218        for segment in segments.iter() {
219            if let Some(postings) = segment.get_postings(field, term).await? {
220                results.push((Arc::clone(segment), postings));
221            }
222        }
223
224        Ok(results)
225    }
226
227    /// Execute CPU-intensive work on thread pool (native only)
228    #[cfg(feature = "native")]
229    pub async fn spawn_blocking<F, R>(&self, f: F) -> R
230    where
231        F: FnOnce() -> R + Send + 'static,
232        R: Send + 'static,
233    {
234        let (tx, rx) = tokio::sync::oneshot::channel();
235        self.thread_pool.spawn(move || {
236            let result = f();
237            let _ = tx.send(result);
238        });
239        rx.await.expect("Thread pool task panicked")
240    }
241
242    /// Get segment readers for query execution
243    pub fn segment_readers(&self) -> Vec<Arc<SegmentReader>> {
244        self.segments.read().clone()
245    }
246
247    /// Reload segments from directory (after new segments added)
248    pub async fn reload(&self) -> Result<()> {
249        let new_segments = Self::load_segments(&self.directory, &self.schema, &self.config).await?;
250        *self.segments.write() = new_segments;
251        Ok(())
252    }
253
254    /// Search across all segments
255    pub async fn search(
256        &self,
257        query: &dyn crate::query::Query,
258        limit: usize,
259    ) -> Result<Vec<crate::query::SearchResult>> {
260        let segments = self.segments.read().clone();
261        let mut all_results = Vec::new();
262
263        for segment in &segments {
264            let results = crate::query::search_segment(segment.as_ref(), query, limit).await?;
265            all_results.extend(results);
266        }
267
268        // Sort by score descending
269        all_results.sort_by(|a, b| {
270            b.score
271                .partial_cmp(&a.score)
272                .unwrap_or(std::cmp::Ordering::Equal)
273        });
274        all_results.truncate(limit);
275
276        Ok(all_results)
277    }
278
279    /// Search and return results with document addresses (no document content)
280    pub async fn search_with_addresses(
281        &self,
282        query: &dyn crate::query::Query,
283        limit: usize,
284    ) -> Result<crate::query::SearchResponse> {
285        let segments = self.segments.read().clone();
286        let mut all_results: Vec<(u128, crate::query::SearchResult)> = Vec::new();
287
288        for segment in &segments {
289            let segment_id = segment.meta().id;
290            let results = crate::query::search_segment(segment.as_ref(), query, limit).await?;
291            for result in results {
292                all_results.push((segment_id, result));
293            }
294        }
295
296        // Sort by score descending
297        all_results.sort_by(|a, b| {
298            b.1.score
299                .partial_cmp(&a.1.score)
300                .unwrap_or(std::cmp::Ordering::Equal)
301        });
302        all_results.truncate(limit);
303
304        let total_hits = all_results.len() as u32;
305        let hits: Vec<crate::query::SearchHit> = all_results
306            .into_iter()
307            .map(|(segment_id, result)| crate::query::SearchHit {
308                address: crate::query::DocAddress::new(segment_id, result.doc_id),
309                score: result.score,
310            })
311            .collect();
312
313        Ok(crate::query::SearchResponse { hits, total_hits })
314    }
315
316    /// Get a document by its unique address (segment_id + local doc_id)
317    pub async fn get_document(
318        &self,
319        address: &crate::query::DocAddress,
320    ) -> Result<Option<Document>> {
321        let segment_id = address
322            .segment_id_u128()
323            .ok_or_else(|| Error::Query(format!("Invalid segment ID: {}", address.segment_id)))?;
324
325        let segments = self.segments.read().clone();
326        for segment in &segments {
327            if segment.meta().id == segment_id {
328                return segment.doc(address.doc_id).await;
329            }
330        }
331
332        Ok(None)
333    }
334
335    /// Get the default fields for this index
336    pub fn default_fields(&self) -> &[crate::Field] {
337        &self.default_fields
338    }
339
340    /// Set the default fields for query parsing
341    pub fn set_default_fields(&mut self, fields: Vec<crate::Field>) {
342        self.default_fields = fields;
343    }
344
345    /// Get the tokenizer registry
346    pub fn tokenizers(&self) -> &Arc<crate::tokenizer::TokenizerRegistry> {
347        &self.tokenizers
348    }
349
350    /// Create a query parser for this index
351    ///
352    /// If the schema contains query router rules, they will be used to route
353    /// queries to specific fields based on regex patterns.
354    pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
355        // Check if schema has query routers
356        let query_routers = self.schema.query_routers();
357        if !query_routers.is_empty() {
358            // Try to create a router from the schema's rules
359            if let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers) {
360                return crate::dsl::QueryLanguageParser::with_router(
361                    Arc::clone(&self.schema),
362                    self.default_fields.clone(),
363                    Arc::clone(&self.tokenizers),
364                    router,
365                );
366            }
367        }
368
369        // Fall back to parser without router
370        crate::dsl::QueryLanguageParser::new(
371            Arc::clone(&self.schema),
372            self.default_fields.clone(),
373            Arc::clone(&self.tokenizers),
374        )
375    }
376
377    /// Parse and search using a query string
378    ///
379    /// Accepts both query language syntax (field:term, AND, OR, NOT, grouping)
380    /// and simple text (tokenized and searched across default fields).
381    /// Returns document addresses (segment_id + doc_id) without document content.
382    pub async fn query(
383        &self,
384        query_str: &str,
385        limit: usize,
386    ) -> Result<crate::query::SearchResponse> {
387        let parser = self.query_parser();
388        let query = parser.parse(query_str).map_err(Error::Query)?;
389        self.search_with_addresses(query.as_ref(), limit).await
390    }
391}
392
393/// Methods for opening index with slice caching
394impl<D: Directory> Index<SliceCachingDirectory<D>> {
395    /// Open an index with slice caching, automatically loading the cache file if present
396    ///
397    /// This wraps the directory in a SliceCachingDirectory and attempts to load
398    /// any existing slice cache file to prefill the cache with hot data.
399    pub async fn open_with_cache(
400        directory: D,
401        config: IndexConfig,
402        cache_max_bytes: usize,
403    ) -> Result<Self> {
404        let caching_dir = SliceCachingDirectory::new(directory, cache_max_bytes);
405
406        // Try to load existing slice cache
407        let cache_path = Path::new(SLICE_CACHE_FILENAME);
408        if let Ok(true) = caching_dir.inner().exists(cache_path).await
409            && let Ok(slice) = caching_dir.inner().open_read(cache_path).await
410            && let Ok(bytes) = slice.read_bytes().await
411        {
412            let _ = caching_dir.deserialize(bytes.as_slice());
413        }
414
415        Self::open(caching_dir, config).await
416    }
417
418    /// Serialize the current slice cache to the index directory
419    ///
420    /// This saves all cached slices to a single file that can be loaded
421    /// on subsequent index opens for faster startup.
422    #[cfg(feature = "native")]
423    pub async fn save_slice_cache(&self) -> Result<()>
424    where
425        D: DirectoryWriter,
426    {
427        let cache_data = self.directory.serialize();
428        let cache_path = Path::new(SLICE_CACHE_FILENAME);
429        self.directory
430            .inner()
431            .write(cache_path, &cache_data)
432            .await?;
433        Ok(())
434    }
435
436    /// Get slice cache statistics
437    pub fn slice_cache_stats(&self) -> crate::directories::SliceCacheStats {
438        self.directory.stats()
439    }
440}
441
442/// Warm up the slice cache by opening an index and performing typical read operations
443///
444/// This function opens an index using a SliceCachingDirectory, performs operations
445/// that would typically be done during search (reading term dictionaries, posting lists),
446/// and then serializes the cache to a file for future use.
447///
448/// The resulting cache file contains all the "hot" data that was read during warmup,
449/// allowing subsequent index opens to prefill the cache and avoid cold-start latency.
450#[cfg(feature = "native")]
451pub async fn warmup_and_save_slice_cache<D: DirectoryWriter>(
452    directory: D,
453    config: IndexConfig,
454    cache_max_bytes: usize,
455) -> Result<()> {
456    let caching_dir = SliceCachingDirectory::new(directory, cache_max_bytes);
457    let index = Index::open(caching_dir, config).await?;
458
459    // Warm up by loading segment metadata and term dictionaries
460    // The SegmentReader::open already reads essential metadata
461    // Additional warmup can be done by iterating terms or doing sample queries
462
463    // Save the cache
464    index.save_slice_cache().await?;
465
466    Ok(())
467}
468
469#[cfg(feature = "native")]
470impl<D: Directory> Clone for Index<D> {
471    fn clone(&self) -> Self {
472        Self {
473            directory: Arc::clone(&self.directory),
474            schema: Arc::clone(&self.schema),
475            config: self.config.clone(),
476            segments: RwLock::new(self.segments.read().clone()),
477            default_fields: self.default_fields.clone(),
478            tokenizers: Arc::clone(&self.tokenizers),
479            thread_pool: Arc::clone(&self.thread_pool),
480        }
481    }
482}
483
484/// Async IndexWriter for adding documents and committing segments
485///
486/// Features:
487/// - Streams documents to disk immediately (no in-memory document storage)
488/// - Uses string interning for terms (reduced allocations)
489/// - Uses hashbrown HashMap (faster than BTreeMap)
490/// - Spills large posting lists to disk (bounded memory)
491/// - Uses memory-mapped files for intermediate data
492#[cfg(feature = "native")]
493pub struct IndexWriter<D: DirectoryWriter> {
494    directory: Arc<D>,
495    schema: Arc<Schema>,
496    config: IndexConfig,
497    builder_config: SegmentBuilderConfig,
498    tokenizers: FxHashMap<Field, BoxedTokenizer>,
499    /// Segment builder
500    builder: AsyncMutex<Option<SegmentBuilder>>,
501    /// List of committed segment IDs (hex strings)
502    segment_ids: AsyncMutex<Vec<String>>,
503}
504
505#[cfg(feature = "native")]
506impl<D: DirectoryWriter> IndexWriter<D> {
507    /// Create a new index in the directory
508    pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
509        Self::create_with_config(directory, schema, config, SegmentBuilderConfig::default()).await
510    }
511
512    /// Create a new index with custom builder config
513    pub async fn create_with_config(
514        directory: D,
515        schema: Schema,
516        config: IndexConfig,
517        builder_config: SegmentBuilderConfig,
518    ) -> Result<Self> {
519        let directory = Arc::new(directory);
520        let schema = Arc::new(schema);
521
522        // Write schema
523        let schema_bytes =
524            serde_json::to_vec(&*schema).map_err(|e| Error::Serialization(e.to_string()))?;
525        directory
526            .write(Path::new("schema.json"), &schema_bytes)
527            .await?;
528
529        // Write empty segments list
530        let segments_bytes = serde_json::to_vec(&Vec::<String>::new())
531            .map_err(|e| Error::Serialization(e.to_string()))?;
532        directory
533            .write(Path::new("segments.json"), &segments_bytes)
534            .await?;
535
536        Ok(Self {
537            directory,
538            schema,
539            config,
540            builder_config,
541            tokenizers: FxHashMap::default(),
542            builder: AsyncMutex::new(None),
543            segment_ids: AsyncMutex::new(Vec::new()),
544        })
545    }
546
547    /// Open an existing index for writing
548    pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
549        Self::open_with_config(directory, config, SegmentBuilderConfig::default()).await
550    }
551
552    /// Open an existing index with custom builder config
553    pub async fn open_with_config(
554        directory: D,
555        config: IndexConfig,
556        builder_config: SegmentBuilderConfig,
557    ) -> Result<Self> {
558        let directory = Arc::new(directory);
559
560        // Read schema
561        let schema_slice = directory.open_read(Path::new("schema.json")).await?;
562        let schema_bytes = schema_slice.read_bytes().await?;
563        let schema: Schema = serde_json::from_slice(schema_bytes.as_slice())
564            .map_err(|e| Error::Serialization(e.to_string()))?;
565        let schema = Arc::new(schema);
566
567        // Read existing segment IDs (hex strings)
568        let segments_slice = directory.open_read(Path::new("segments.json")).await?;
569        let segments_bytes = segments_slice.read_bytes().await?;
570        let segment_ids: Vec<String> = serde_json::from_slice(segments_bytes.as_slice())
571            .map_err(|e| Error::Serialization(e.to_string()))?;
572
573        Ok(Self {
574            directory,
575            schema,
576            config,
577            builder_config,
578            tokenizers: FxHashMap::default(),
579            builder: AsyncMutex::new(None),
580            segment_ids: AsyncMutex::new(segment_ids),
581        })
582    }
583
584    /// Get the schema
585    pub fn schema(&self) -> &Schema {
586        &self.schema
587    }
588
589    /// Set tokenizer for a field
590    pub fn set_tokenizer<T: crate::tokenizer::Tokenizer>(&mut self, field: Field, tokenizer: T) {
591        self.tokenizers.insert(field, Box::new(tokenizer));
592    }
593
594    /// Add a document
595    ///
596    /// Documents are streamed to disk immediately, keeping memory usage bounded.
597    /// When the builder reaches `max_docs_per_segment`, it is committed and a new one starts.
598    pub async fn add_document(&self, doc: Document) -> Result<DocId> {
599        let mut builder_guard = self.builder.lock().await;
600
601        // Initialize builder if needed
602        if builder_guard.is_none() {
603            let mut builder =
604                SegmentBuilder::new((*self.schema).clone(), self.builder_config.clone())?;
605            for (field, tokenizer) in &self.tokenizers {
606                builder.set_tokenizer(*field, tokenizer.clone_box());
607            }
608            *builder_guard = Some(builder);
609        }
610
611        let builder = builder_guard.as_mut().unwrap();
612        let doc_id = builder.add_document(doc)?;
613
614        // Check if we need to commit
615        if builder.num_docs() >= self.config.max_docs_per_segment {
616            let full_builder = builder_guard.take().unwrap();
617            drop(builder_guard); // Release lock before async operation
618            self.commit_builder(full_builder).await?;
619        }
620
621        Ok(doc_id)
622    }
623
624    /// Get current builder statistics for debugging
625    pub async fn get_builder_stats(&self) -> Option<crate::segment::SegmentBuilderStats> {
626        let builder_guard = self.builder.lock().await;
627        builder_guard.as_ref().map(|b| b.stats())
628    }
629
630    /// Commit all pending segments to disk
631    pub async fn commit(&self) -> Result<()> {
632        let mut builder_guard = self.builder.lock().await;
633
634        if let Some(builder) = builder_guard.take()
635            && builder.num_docs() > 0
636        {
637            drop(builder_guard);
638            self.commit_builder(builder).await?;
639        }
640
641        Ok(())
642    }
643
644    async fn commit_builder(&self, builder: SegmentBuilder) -> Result<()> {
645        let segment_id = SegmentId::new();
646        builder.build(self.directory.as_ref(), segment_id).await?;
647
648        // Update segment list
649        let mut segment_ids = self.segment_ids.lock().await;
650        segment_ids.push(segment_id.to_hex());
651
652        let segments_bytes =
653            serde_json::to_vec(&*segment_ids).map_err(|e| Error::Serialization(e.to_string()))?;
654        self.directory
655            .write(Path::new("segments.json"), &segments_bytes)
656            .await?;
657
658        // Check if merge is needed
659        if segment_ids.len() >= self.config.merge_threshold {
660            drop(segment_ids); // Release lock
661            self.maybe_merge().await?;
662        }
663
664        Ok(())
665    }
666
667    /// Merge segments if threshold is reached
668    async fn maybe_merge(&self) -> Result<()> {
669        let segment_ids = self.segment_ids.lock().await;
670
671        if segment_ids.len() < self.config.merge_threshold {
672            return Ok(());
673        }
674
675        let ids_to_merge: Vec<String> = segment_ids.clone();
676        drop(segment_ids);
677
678        // Load segment readers
679        let mut readers = Vec::new();
680        let mut doc_offset = 0u32;
681
682        for id_str in &ids_to_merge {
683            let segment_id = SegmentId::from_hex(id_str)
684                .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
685            let reader = SegmentReader::open(
686                self.directory.as_ref(),
687                segment_id,
688                Arc::clone(&self.schema),
689                doc_offset,
690                self.config.term_cache_blocks,
691            )
692            .await?;
693            doc_offset += reader.meta().num_docs;
694            readers.push(reader);
695        }
696
697        // Merge into new segment
698        let merger = SegmentMerger::new(Arc::clone(&self.schema));
699        let new_segment_id = SegmentId::new();
700        merger
701            .merge(self.directory.as_ref(), &readers, new_segment_id)
702            .await?;
703
704        // Update segment list
705        let mut segment_ids = self.segment_ids.lock().await;
706        segment_ids.clear();
707        segment_ids.push(new_segment_id.to_hex());
708
709        let segments_bytes =
710            serde_json::to_vec(&*segment_ids).map_err(|e| Error::Serialization(e.to_string()))?;
711        self.directory
712            .write(Path::new("segments.json"), &segments_bytes)
713            .await?;
714
715        // Delete old segments
716        for id_str in ids_to_merge {
717            if let Some(segment_id) = SegmentId::from_hex(&id_str) {
718                let _ = crate::segment::delete_segment(self.directory.as_ref(), segment_id).await;
719            }
720        }
721
722        Ok(())
723    }
724
725    /// Force merge all segments into one
726    pub async fn force_merge(&self) -> Result<()> {
727        // First commit any pending documents
728        self.commit().await?;
729
730        let segment_ids = self.segment_ids.lock().await;
731        if segment_ids.len() <= 1 {
732            return Ok(());
733        }
734
735        let ids_to_merge: Vec<String> = segment_ids.clone();
736        drop(segment_ids);
737
738        // Load all segments
739        let mut readers = Vec::new();
740        let mut doc_offset = 0u32;
741
742        for id_str in &ids_to_merge {
743            let segment_id = SegmentId::from_hex(id_str)
744                .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
745            let reader = SegmentReader::open(
746                self.directory.as_ref(),
747                segment_id,
748                Arc::clone(&self.schema),
749                doc_offset,
750                self.config.term_cache_blocks,
751            )
752            .await?;
753            doc_offset += reader.meta().num_docs;
754            readers.push(reader);
755        }
756
757        // Merge
758        let merger = SegmentMerger::new(Arc::clone(&self.schema));
759        let new_segment_id = SegmentId::new();
760        merger
761            .merge(self.directory.as_ref(), &readers, new_segment_id)
762            .await?;
763
764        // Update segment list
765        let mut segment_ids = self.segment_ids.lock().await;
766        segment_ids.clear();
767        segment_ids.push(new_segment_id.to_hex());
768
769        let segments_bytes =
770            serde_json::to_vec(&*segment_ids).map_err(|e| Error::Serialization(e.to_string()))?;
771        self.directory
772            .write(Path::new("segments.json"), &segments_bytes)
773            .await?;
774
775        // Delete old segments
776        for id_str in ids_to_merge {
777            if let Some(segment_id) = SegmentId::from_hex(&id_str) {
778                let _ = crate::segment::delete_segment(self.directory.as_ref(), segment_id).await;
779            }
780        }
781
782        Ok(())
783    }
784}
785
786#[cfg(test)]
787mod tests {
788    use super::*;
789    use crate::directories::RamDirectory;
790    use crate::dsl::SchemaBuilder;
791
792    #[tokio::test]
793    async fn test_index_create_and_search() {
794        let mut schema_builder = SchemaBuilder::default();
795        let title = schema_builder.add_text_field("title", true, true);
796        let body = schema_builder.add_text_field("body", true, true);
797        let schema = schema_builder.build();
798
799        let dir = RamDirectory::new();
800        let config = IndexConfig::default();
801
802        // Create index and add documents
803        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
804            .await
805            .unwrap();
806
807        let mut doc1 = Document::new();
808        doc1.add_text(title, "Hello World");
809        doc1.add_text(body, "This is the first document");
810        writer.add_document(doc1).await.unwrap();
811
812        let mut doc2 = Document::new();
813        doc2.add_text(title, "Goodbye World");
814        doc2.add_text(body, "This is the second document");
815        writer.add_document(doc2).await.unwrap();
816
817        writer.commit().await.unwrap();
818
819        // Open for reading
820        let index = Index::open(dir, config).await.unwrap();
821        assert_eq!(index.num_docs(), 2);
822
823        // Check postings
824        let postings = index.get_postings(title, b"world").await.unwrap();
825        assert_eq!(postings.len(), 1); // One segment
826        assert_eq!(postings[0].1.doc_count(), 2); // Two docs with "world"
827
828        // Retrieve document
829        let doc = index.doc(0).await.unwrap().unwrap();
830        assert_eq!(doc.get_first(title).unwrap().as_text(), Some("Hello World"));
831    }
832
833    #[tokio::test]
834    async fn test_multiple_segments() {
835        let mut schema_builder = SchemaBuilder::default();
836        let title = schema_builder.add_text_field("title", true, true);
837        let schema = schema_builder.build();
838
839        let dir = RamDirectory::new();
840        let config = IndexConfig {
841            max_docs_per_segment: 5, // Small segments for testing
842            merge_threshold: 10,     // Don't auto-merge
843            ..Default::default()
844        };
845
846        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
847            .await
848            .unwrap();
849
850        // Add documents in batches to create multiple segments
851        for batch in 0..3 {
852            for i in 0..5 {
853                let mut doc = Document::new();
854                doc.add_text(title, format!("Document {} batch {}", i, batch));
855                writer.add_document(doc).await.unwrap();
856            }
857            writer.commit().await.unwrap();
858        }
859
860        // Open and check
861        let index = Index::open(dir, config).await.unwrap();
862        assert_eq!(index.num_docs(), 15);
863        assert_eq!(index.segment_readers().len(), 3);
864    }
865
866    #[tokio::test]
867    async fn test_segment_merge() {
868        let mut schema_builder = SchemaBuilder::default();
869        let title = schema_builder.add_text_field("title", true, true);
870        let schema = schema_builder.build();
871
872        let dir = RamDirectory::new();
873        let config = IndexConfig {
874            max_docs_per_segment: 3,
875            merge_threshold: 100, // Don't auto-merge
876            ..Default::default()
877        };
878
879        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
880            .await
881            .unwrap();
882
883        // Create multiple segments
884        for i in 0..9 {
885            let mut doc = Document::new();
886            doc.add_text(title, format!("Document {}", i));
887            writer.add_document(doc).await.unwrap();
888        }
889        writer.commit().await.unwrap();
890
891        // Should have 3 segments
892        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
893        assert_eq!(index.segment_readers().len(), 3);
894
895        // Force merge
896        let writer = IndexWriter::open(dir.clone(), config.clone())
897            .await
898            .unwrap();
899        writer.force_merge().await.unwrap();
900
901        // Should have 1 segment now
902        let index = Index::open(dir, config).await.unwrap();
903        assert_eq!(index.segment_readers().len(), 1);
904        assert_eq!(index.num_docs(), 9);
905
906        // Verify all documents accessible
907        for i in 0..9 {
908            let doc = index.doc(i).await.unwrap().unwrap();
909            assert_eq!(
910                doc.get_first(title).unwrap().as_text(),
911                Some(format!("Document {}", i).as_str())
912            );
913        }
914    }
915
916    #[tokio::test]
917    async fn test_match_query() {
918        let mut schema_builder = SchemaBuilder::default();
919        let title = schema_builder.add_text_field("title", true, true);
920        let body = schema_builder.add_text_field("body", true, true);
921        let schema = schema_builder.build();
922
923        let dir = RamDirectory::new();
924        let config = IndexConfig::default();
925
926        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
927            .await
928            .unwrap();
929
930        let mut doc1 = Document::new();
931        doc1.add_text(title, "rust programming");
932        doc1.add_text(body, "Learn rust language");
933        writer.add_document(doc1).await.unwrap();
934
935        let mut doc2 = Document::new();
936        doc2.add_text(title, "python programming");
937        doc2.add_text(body, "Learn python language");
938        writer.add_document(doc2).await.unwrap();
939
940        writer.commit().await.unwrap();
941
942        let index = Index::open(dir, config).await.unwrap();
943
944        // Test match query with multiple default fields
945        let results = index.query("rust", 10).await.unwrap();
946        assert_eq!(results.hits.len(), 1);
947
948        // Test match query with multiple tokens
949        let results = index.query("rust programming", 10).await.unwrap();
950        assert!(!results.hits.is_empty());
951
952        // Verify hit has address (segment_id + doc_id)
953        let hit = &results.hits[0];
954        assert!(!hit.address.segment_id.is_empty(), "Should have segment_id");
955
956        // Verify document retrieval by address
957        let doc = index.get_document(&hit.address).await.unwrap().unwrap();
958        assert!(
959            !doc.field_values().is_empty(),
960            "Doc should have field values"
961        );
962
963        // Also verify doc retrieval directly by global doc_id
964        let doc = index.doc(0).await.unwrap().unwrap();
965        assert!(
966            !doc.field_values().is_empty(),
967            "Doc should have field values"
968        );
969    }
970
971    #[tokio::test]
972    async fn test_slice_cache_warmup_and_load() {
973        use crate::directories::SliceCachingDirectory;
974
975        let mut schema_builder = SchemaBuilder::default();
976        let title = schema_builder.add_text_field("title", true, true);
977        let body = schema_builder.add_text_field("body", true, true);
978        let schema = schema_builder.build();
979
980        let dir = RamDirectory::new();
981        let config = IndexConfig::default();
982
983        // Create index with some documents
984        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
985            .await
986            .unwrap();
987
988        for i in 0..10 {
989            let mut doc = Document::new();
990            doc.add_text(title, format!("Document {} about rust", i));
991            doc.add_text(body, format!("This is body text number {}", i));
992            writer.add_document(doc).await.unwrap();
993        }
994        writer.commit().await.unwrap();
995
996        // Open with slice caching and perform some operations to warm up cache
997        let caching_dir = SliceCachingDirectory::new(dir.clone(), 1024 * 1024);
998        let index = Index::open(caching_dir, config.clone()).await.unwrap();
999
1000        // Perform a search to warm up the cache
1001        let results = index.query("rust", 10).await.unwrap();
1002        assert!(!results.hits.is_empty());
1003
1004        // Check cache stats - should have cached some data
1005        let stats = index.slice_cache_stats();
1006        assert!(stats.total_bytes > 0, "Cache should have data after search");
1007
1008        // Save the cache
1009        index.save_slice_cache().await.unwrap();
1010
1011        // Verify cache file was written
1012        assert!(
1013            dir.exists(Path::new(super::SLICE_CACHE_FILENAME))
1014                .await
1015                .unwrap()
1016        );
1017
1018        // Now open with cache loading
1019        let index2 = Index::open_with_cache(dir.clone(), config.clone(), 1024 * 1024)
1020            .await
1021            .unwrap();
1022
1023        // Cache should be prefilled
1024        let stats2 = index2.slice_cache_stats();
1025        assert!(
1026            stats2.total_bytes > 0,
1027            "Cache should be prefilled from file"
1028        );
1029
1030        // Search should still work
1031        let results2 = index2.query("rust", 10).await.unwrap();
1032        assert_eq!(results.hits.len(), results2.hits.len());
1033    }
1034
1035    #[tokio::test]
1036    async fn test_multivalue_field_indexing_and_search() {
1037        let mut schema_builder = SchemaBuilder::default();
1038        let uris = schema_builder.add_text_field("uris", true, true);
1039        let title = schema_builder.add_text_field("title", true, true);
1040        let schema = schema_builder.build();
1041
1042        let dir = RamDirectory::new();
1043        let config = IndexConfig::default();
1044
1045        // Create index and add document with multi-value field
1046        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1047            .await
1048            .unwrap();
1049
1050        let mut doc = Document::new();
1051        doc.add_text(uris, "one");
1052        doc.add_text(uris, "two");
1053        doc.add_text(title, "Test Document");
1054        writer.add_document(doc).await.unwrap();
1055
1056        // Add another document with different uris
1057        let mut doc2 = Document::new();
1058        doc2.add_text(uris, "three");
1059        doc2.add_text(title, "Another Document");
1060        writer.add_document(doc2).await.unwrap();
1061
1062        writer.commit().await.unwrap();
1063
1064        // Open for reading
1065        let index = Index::open(dir, config).await.unwrap();
1066        assert_eq!(index.num_docs(), 2);
1067
1068        // Verify document retrieval preserves all values
1069        let doc = index.doc(0).await.unwrap().unwrap();
1070        let all_uris: Vec<_> = doc.get_all(uris).collect();
1071        assert_eq!(all_uris.len(), 2, "Should have 2 uris values");
1072        assert_eq!(all_uris[0].as_text(), Some("one"));
1073        assert_eq!(all_uris[1].as_text(), Some("two"));
1074
1075        // Verify to_json returns array for multi-value field
1076        let json = doc.to_json(index.schema());
1077        let uris_json = json.get("uris").unwrap();
1078        assert!(uris_json.is_array(), "Multi-value field should be an array");
1079        let uris_arr = uris_json.as_array().unwrap();
1080        assert_eq!(uris_arr.len(), 2);
1081        assert_eq!(uris_arr[0].as_str(), Some("one"));
1082        assert_eq!(uris_arr[1].as_str(), Some("two"));
1083
1084        // Verify both values are searchable
1085        let results = index.query("uris:one", 10).await.unwrap();
1086        assert_eq!(results.hits.len(), 1, "Should find doc with 'one'");
1087        assert_eq!(results.hits[0].address.doc_id, 0);
1088
1089        let results = index.query("uris:two", 10).await.unwrap();
1090        assert_eq!(results.hits.len(), 1, "Should find doc with 'two'");
1091        assert_eq!(results.hits[0].address.doc_id, 0);
1092
1093        let results = index.query("uris:three", 10).await.unwrap();
1094        assert_eq!(results.hits.len(), 1, "Should find doc with 'three'");
1095        assert_eq!(results.hits[0].address.doc_id, 1);
1096
1097        // Verify searching for non-existent value returns no results
1098        let results = index.query("uris:nonexistent", 10).await.unwrap();
1099        assert_eq!(results.hits.len(), 0, "Should not find non-existent value");
1100    }
1101}