hermes_core/
index.rs

1//! Index - multi-segment async search index
2//!
3//! Components:
4//! - Index: main entry point for searching
5//! - IndexWriter: for adding documents and committing segments
6//! - Supports multiple segments with merge
7
8use std::path::Path;
9use std::sync::Arc;
10
11use parking_lot::RwLock;
12#[cfg(feature = "native")]
13use rustc_hash::FxHashMap;
14
15use crate::DocId;
16#[cfg(feature = "native")]
17use crate::directories::DirectoryWriter;
18use crate::directories::{Directory, SliceCachingDirectory};
19use crate::dsl::{Document, Field, Schema};
20use crate::error::{Error, Result};
21#[cfg(feature = "native")]
22use crate::segment::{SegmentBuilder, SegmentMerger};
23use crate::segment::{SegmentId, SegmentReader};
24use crate::structures::BlockPostingList;
25#[cfg(feature = "native")]
26use crate::tokenizer::BoxedTokenizer;
27
28#[cfg(feature = "native")]
29use tokio::sync::Mutex as AsyncMutex;
30
31/// Default file name for the slice cache
32pub const SLICE_CACHE_FILENAME: &str = "index.slicecache";
33
34/// Index configuration
35#[derive(Debug, Clone)]
36pub struct IndexConfig {
37    /// Number of threads for CPU-intensive tasks (search parallelism)
38    pub num_threads: usize,
39    /// Number of parallel segment builders (documents distributed round-robin)
40    pub num_indexing_threads: usize,
41    /// Number of threads for parallel block compression within each segment
42    pub num_compression_threads: usize,
43    /// Block cache size for term dictionary per segment
44    pub term_cache_blocks: usize,
45    /// Block cache size for document store per segment
46    pub store_cache_blocks: usize,
47    /// Max documents per segment before auto-commit
48    pub max_docs_per_segment: u32,
49    /// Min segments to trigger merge
50    pub merge_threshold: usize,
51}
52
53impl Default for IndexConfig {
54    fn default() -> Self {
55        #[cfg(feature = "native")]
56        let cpus = num_cpus::get().max(1);
57        #[cfg(not(feature = "native"))]
58        let cpus = 1;
59
60        Self {
61            num_threads: cpus,
62            num_indexing_threads: 1,
63            num_compression_threads: cpus,
64            term_cache_blocks: 256,
65            store_cache_blocks: 32,
66            max_docs_per_segment: 100_000,
67            merge_threshold: 5,
68        }
69    }
70}
71
72/// Multi-segment async Index
73///
74/// The main entry point for searching. Manages multiple segments
75/// and provides unified search across all of them.
76pub struct Index<D: Directory> {
77    directory: Arc<D>,
78    schema: Arc<Schema>,
79    config: IndexConfig,
80    segments: RwLock<Vec<Arc<SegmentReader>>>,
81    default_fields: Vec<crate::Field>,
82    tokenizers: Arc<crate::tokenizer::TokenizerRegistry>,
83    #[cfg(feature = "native")]
84    thread_pool: Arc<rayon::ThreadPool>,
85}
86
87impl<D: Directory> Index<D> {
88    /// Open an existing index from a directory
89    pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
90        let directory = Arc::new(directory);
91
92        // Read schema
93        let schema_slice = directory.open_read(Path::new("schema.json")).await?;
94        let schema_bytes = schema_slice.read_bytes().await?;
95        let schema: Schema = serde_json::from_slice(schema_bytes.as_slice())
96            .map_err(|e| Error::Serialization(e.to_string()))?;
97        let schema = Arc::new(schema);
98
99        // Read segment list
100        let segments = Self::load_segments(&directory, &schema, &config).await?;
101
102        #[cfg(feature = "native")]
103        let thread_pool = {
104            let pool = rayon::ThreadPoolBuilder::new()
105                .num_threads(config.num_threads)
106                .build()
107                .map_err(|e| Error::Io(std::io::Error::other(e)))?;
108            Arc::new(pool)
109        };
110
111        // Use schema's default_fields if specified, otherwise fall back to all indexed text fields
112        let default_fields: Vec<crate::Field> = if !schema.default_fields().is_empty() {
113            schema.default_fields().to_vec()
114        } else {
115            schema
116                .fields()
117                .filter(|(_, entry)| {
118                    entry.indexed && entry.field_type == crate::dsl::FieldType::Text
119                })
120                .map(|(field, _)| field)
121                .collect()
122        };
123
124        Ok(Self {
125            directory,
126            schema,
127            config,
128            segments: RwLock::new(segments),
129            default_fields,
130            tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
131            #[cfg(feature = "native")]
132            thread_pool,
133        })
134    }
135
136    async fn load_segments(
137        directory: &Arc<D>,
138        schema: &Arc<Schema>,
139        config: &IndexConfig,
140    ) -> Result<Vec<Arc<SegmentReader>>> {
141        // Read segments.json which lists all segment IDs
142        let segments_path = Path::new("segments.json");
143        if !directory.exists(segments_path).await? {
144            return Ok(Vec::new());
145        }
146
147        let segments_slice = directory.open_read(segments_path).await?;
148        let segments_bytes = segments_slice.read_bytes().await?;
149        let segment_ids: Vec<String> = serde_json::from_slice(segments_bytes.as_slice())
150            .map_err(|e| Error::Serialization(e.to_string()))?;
151
152        let mut segments = Vec::new();
153        let mut doc_id_offset = 0u32;
154
155        for id_str in segment_ids {
156            let segment_id = SegmentId::from_hex(&id_str)
157                .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
158            let reader = SegmentReader::open(
159                directory.as_ref(),
160                segment_id,
161                Arc::clone(schema),
162                doc_id_offset,
163                config.term_cache_blocks,
164            )
165            .await?;
166
167            doc_id_offset += reader.meta().num_docs;
168            segments.push(Arc::new(reader));
169        }
170
171        Ok(segments)
172    }
173
174    /// Get the schema
175    pub fn schema(&self) -> &Schema {
176        &self.schema
177    }
178
179    /// Get a reference to the underlying directory
180    pub fn directory(&self) -> &D {
181        &self.directory
182    }
183
184    /// Total number of documents across all segments
185    pub fn num_docs(&self) -> u32 {
186        self.segments.read().iter().map(|s| s.num_docs()).sum()
187    }
188
189    /// Get a document by global doc_id (async)
190    pub async fn doc(&self, doc_id: DocId) -> Result<Option<Document>> {
191        let segments = self.segments.read().clone();
192
193        let mut offset = 0u32;
194        for segment in segments.iter() {
195            let segment_docs = segment.meta().num_docs;
196            if doc_id < offset + segment_docs {
197                let local_doc_id = doc_id - offset;
198                return segment.doc(local_doc_id).await;
199            }
200            offset += segment_docs;
201        }
202
203        Ok(None)
204    }
205
206    /// Get posting lists for a term across all segments (async)
207    pub async fn get_postings(
208        &self,
209        field: Field,
210        term: &[u8],
211    ) -> Result<Vec<(Arc<SegmentReader>, BlockPostingList)>> {
212        let segments = self.segments.read().clone();
213        let mut results = Vec::new();
214
215        for segment in segments.iter() {
216            if let Some(postings) = segment.get_postings(field, term).await? {
217                results.push((Arc::clone(segment), postings));
218            }
219        }
220
221        Ok(results)
222    }
223
224    /// Execute CPU-intensive work on thread pool (native only)
225    #[cfg(feature = "native")]
226    pub async fn spawn_blocking<F, R>(&self, f: F) -> R
227    where
228        F: FnOnce() -> R + Send + 'static,
229        R: Send + 'static,
230    {
231        let (tx, rx) = tokio::sync::oneshot::channel();
232        self.thread_pool.spawn(move || {
233            let result = f();
234            let _ = tx.send(result);
235        });
236        rx.await.expect("Thread pool task panicked")
237    }
238
239    /// Get segment readers for query execution
240    pub fn segment_readers(&self) -> Vec<Arc<SegmentReader>> {
241        self.segments.read().clone()
242    }
243
244    /// Reload segments from directory (after new segments added)
245    pub async fn reload(&self) -> Result<()> {
246        let new_segments = Self::load_segments(&self.directory, &self.schema, &self.config).await?;
247        *self.segments.write() = new_segments;
248        Ok(())
249    }
250
251    /// Search across all segments
252    pub async fn search(
253        &self,
254        query: &dyn crate::query::Query,
255        limit: usize,
256    ) -> Result<Vec<crate::query::SearchResult>> {
257        let segments = self.segments.read().clone();
258        let mut all_results = Vec::new();
259
260        for segment in &segments {
261            let results = crate::query::search_segment(segment.as_ref(), query, limit).await?;
262            all_results.extend(results);
263        }
264
265        // Sort by score descending
266        all_results.sort_by(|a, b| {
267            b.score
268                .partial_cmp(&a.score)
269                .unwrap_or(std::cmp::Ordering::Equal)
270        });
271        all_results.truncate(limit);
272
273        Ok(all_results)
274    }
275
276    /// Search and return results with document addresses (no document content)
277    pub async fn search_with_addresses(
278        &self,
279        query: &dyn crate::query::Query,
280        limit: usize,
281    ) -> Result<crate::query::SearchResponse> {
282        let segments = self.segments.read().clone();
283        let mut all_results: Vec<(u128, crate::query::SearchResult)> = Vec::new();
284
285        for segment in &segments {
286            let segment_id = segment.meta().id;
287            let results = crate::query::search_segment(segment.as_ref(), query, limit).await?;
288            for result in results {
289                all_results.push((segment_id, result));
290            }
291        }
292
293        // Sort by score descending
294        all_results.sort_by(|a, b| {
295            b.1.score
296                .partial_cmp(&a.1.score)
297                .unwrap_or(std::cmp::Ordering::Equal)
298        });
299        all_results.truncate(limit);
300
301        let total_hits = all_results.len() as u32;
302        let hits: Vec<crate::query::SearchHit> = all_results
303            .into_iter()
304            .map(|(segment_id, result)| crate::query::SearchHit {
305                address: crate::query::DocAddress::new(segment_id, result.doc_id),
306                score: result.score,
307            })
308            .collect();
309
310        Ok(crate::query::SearchResponse { hits, total_hits })
311    }
312
313    /// Get a document by its unique address (segment_id + local doc_id)
314    pub async fn get_document(
315        &self,
316        address: &crate::query::DocAddress,
317    ) -> Result<Option<Document>> {
318        let segment_id = address
319            .segment_id_u128()
320            .ok_or_else(|| Error::Query(format!("Invalid segment ID: {}", address.segment_id)))?;
321
322        let segments = self.segments.read().clone();
323        for segment in &segments {
324            if segment.meta().id == segment_id {
325                return segment.doc(address.doc_id).await;
326            }
327        }
328
329        Ok(None)
330    }
331
332    /// Get the default fields for this index
333    pub fn default_fields(&self) -> &[crate::Field] {
334        &self.default_fields
335    }
336
337    /// Set the default fields for query parsing
338    pub fn set_default_fields(&mut self, fields: Vec<crate::Field>) {
339        self.default_fields = fields;
340    }
341
342    /// Get the tokenizer registry
343    pub fn tokenizers(&self) -> &Arc<crate::tokenizer::TokenizerRegistry> {
344        &self.tokenizers
345    }
346
347    /// Create a query parser for this index
348    ///
349    /// If the schema contains query router rules, they will be used to route
350    /// queries to specific fields based on regex patterns.
351    pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
352        // Check if schema has query routers
353        let query_routers = self.schema.query_routers();
354        if !query_routers.is_empty() {
355            // Try to create a router from the schema's rules
356            if let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers) {
357                return crate::dsl::QueryLanguageParser::with_router(
358                    Arc::clone(&self.schema),
359                    self.default_fields.clone(),
360                    Arc::clone(&self.tokenizers),
361                    router,
362                );
363            }
364        }
365
366        // Fall back to parser without router
367        crate::dsl::QueryLanguageParser::new(
368            Arc::clone(&self.schema),
369            self.default_fields.clone(),
370            Arc::clone(&self.tokenizers),
371        )
372    }
373
374    /// Parse and search using a query string
375    ///
376    /// Accepts both query language syntax (field:term, AND, OR, NOT, grouping)
377    /// and simple text (tokenized and searched across default fields).
378    /// Returns document addresses (segment_id + doc_id) without document content.
379    pub async fn query(
380        &self,
381        query_str: &str,
382        limit: usize,
383    ) -> Result<crate::query::SearchResponse> {
384        let parser = self.query_parser();
385        let query = parser.parse(query_str).map_err(Error::Query)?;
386        self.search_with_addresses(query.as_ref(), limit).await
387    }
388}
389
390/// Methods for opening index with slice caching
391impl<D: Directory> Index<SliceCachingDirectory<D>> {
392    /// Open an index with slice caching, automatically loading the cache file if present
393    ///
394    /// This wraps the directory in a SliceCachingDirectory and attempts to load
395    /// any existing slice cache file to prefill the cache with hot data.
396    pub async fn open_with_cache(
397        directory: D,
398        config: IndexConfig,
399        cache_max_bytes: usize,
400    ) -> Result<Self> {
401        let caching_dir = SliceCachingDirectory::new(directory, cache_max_bytes);
402
403        // Try to load existing slice cache
404        let cache_path = Path::new(SLICE_CACHE_FILENAME);
405        if let Ok(true) = caching_dir.inner().exists(cache_path).await
406            && let Ok(slice) = caching_dir.inner().open_read(cache_path).await
407            && let Ok(bytes) = slice.read_bytes().await
408        {
409            let _ = caching_dir.deserialize(bytes.as_slice());
410        }
411
412        Self::open(caching_dir, config).await
413    }
414
415    /// Serialize the current slice cache to the index directory
416    ///
417    /// This saves all cached slices to a single file that can be loaded
418    /// on subsequent index opens for faster startup.
419    #[cfg(feature = "native")]
420    pub async fn save_slice_cache(&self) -> Result<()>
421    where
422        D: DirectoryWriter,
423    {
424        let cache_data = self.directory.serialize();
425        let cache_path = Path::new(SLICE_CACHE_FILENAME);
426        self.directory
427            .inner()
428            .write(cache_path, &cache_data)
429            .await?;
430        Ok(())
431    }
432
433    /// Get slice cache statistics
434    pub fn slice_cache_stats(&self) -> crate::directories::SliceCacheStats {
435        self.directory.stats()
436    }
437}
438
439/// Warm up the slice cache by opening an index and performing typical read operations
440///
441/// This function opens an index using a SliceCachingDirectory, performs operations
442/// that would typically be done during search (reading term dictionaries, posting lists),
443/// and then serializes the cache to a file for future use.
444///
445/// The resulting cache file contains all the "hot" data that was read during warmup,
446/// allowing subsequent index opens to prefill the cache and avoid cold-start latency.
447#[cfg(feature = "native")]
448pub async fn warmup_and_save_slice_cache<D: DirectoryWriter>(
449    directory: D,
450    config: IndexConfig,
451    cache_max_bytes: usize,
452) -> Result<()> {
453    let caching_dir = SliceCachingDirectory::new(directory, cache_max_bytes);
454    let index = Index::open(caching_dir, config).await?;
455
456    // Warm up by loading segment metadata and term dictionaries
457    // The SegmentReader::open already reads essential metadata
458    // Additional warmup can be done by iterating terms or doing sample queries
459
460    // Save the cache
461    index.save_slice_cache().await?;
462
463    Ok(())
464}
465
466#[cfg(feature = "native")]
467impl<D: Directory> Clone for Index<D> {
468    fn clone(&self) -> Self {
469        Self {
470            directory: Arc::clone(&self.directory),
471            schema: Arc::clone(&self.schema),
472            config: self.config.clone(),
473            segments: RwLock::new(self.segments.read().clone()),
474            default_fields: self.default_fields.clone(),
475            tokenizers: Arc::clone(&self.tokenizers),
476            thread_pool: Arc::clone(&self.thread_pool),
477        }
478    }
479}
480
481/// Async IndexWriter for adding documents and committing segments
482///
483/// Internally manages parallel segment building based on `num_indexing_threads` config.
484/// Documents are distributed across multiple segment builders round-robin style.
485#[cfg(feature = "native")]
486pub struct IndexWriter<D: DirectoryWriter> {
487    directory: Arc<D>,
488    schema: Arc<Schema>,
489    config: IndexConfig,
490    tokenizers: FxHashMap<Field, BoxedTokenizer>,
491    /// Multiple segment builders for parallel indexing
492    segment_builders: AsyncMutex<Vec<SegmentBuilder>>,
493    /// Round-robin counter for distributing documents
494    next_builder: std::sync::atomic::AtomicUsize,
495    /// List of committed segment IDs (hex strings)
496    segment_ids: AsyncMutex<Vec<String>>,
497}
498
499#[cfg(feature = "native")]
500impl<D: DirectoryWriter> IndexWriter<D> {
501    /// Create a new index in the directory
502    pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
503        let directory = Arc::new(directory);
504        let schema = Arc::new(schema);
505
506        // Write schema
507        let schema_bytes =
508            serde_json::to_vec(&*schema).map_err(|e| Error::Serialization(e.to_string()))?;
509        directory
510            .write(Path::new("schema.json"), &schema_bytes)
511            .await?;
512
513        // Write empty segments list
514        let segments_bytes = serde_json::to_vec(&Vec::<String>::new())
515            .map_err(|e| Error::Serialization(e.to_string()))?;
516        directory
517            .write(Path::new("segments.json"), &segments_bytes)
518            .await?;
519
520        Ok(Self {
521            directory,
522            schema,
523            config,
524            tokenizers: FxHashMap::default(),
525            segment_builders: AsyncMutex::new(Vec::new()),
526            next_builder: std::sync::atomic::AtomicUsize::new(0),
527            segment_ids: AsyncMutex::new(Vec::new()),
528        })
529    }
530
531    /// Open an existing index for writing
532    pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
533        let directory = Arc::new(directory);
534
535        // Read schema
536        let schema_slice = directory.open_read(Path::new("schema.json")).await?;
537        let schema_bytes = schema_slice.read_bytes().await?;
538        let schema: Schema = serde_json::from_slice(schema_bytes.as_slice())
539            .map_err(|e| Error::Serialization(e.to_string()))?;
540        let schema = Arc::new(schema);
541
542        // Read existing segment IDs (hex strings)
543        let segments_slice = directory.open_read(Path::new("segments.json")).await?;
544        let segments_bytes = segments_slice.read_bytes().await?;
545        let segment_ids: Vec<String> = serde_json::from_slice(segments_bytes.as_slice())
546            .map_err(|e| Error::Serialization(e.to_string()))?;
547
548        Ok(Self {
549            directory,
550            schema,
551            config,
552            tokenizers: FxHashMap::default(),
553            segment_builders: AsyncMutex::new(Vec::new()),
554            next_builder: std::sync::atomic::AtomicUsize::new(0),
555            segment_ids: AsyncMutex::new(segment_ids),
556        })
557    }
558
559    /// Get the schema
560    pub fn schema(&self) -> &Schema {
561        &self.schema
562    }
563
564    /// Set tokenizer for a field
565    pub fn set_tokenizer<T: crate::tokenizer::Tokenizer>(&mut self, field: Field, tokenizer: T) {
566        self.tokenizers.insert(field, Box::new(tokenizer));
567    }
568
569    /// Add a document
570    ///
571    /// Documents are distributed across internal segment builders based on
572    /// `num_indexing_threads` config. When any builder reaches `max_docs_per_segment`,
573    /// it is committed to disk in the background.
574    pub async fn add_document(&self, doc: Document) -> Result<DocId> {
575        let num_builders = self.config.num_indexing_threads.max(1);
576        let mut builders = self.segment_builders.lock().await;
577
578        // Initialize builders if empty
579        if builders.is_empty() {
580            for _ in 0..num_builders {
581                let mut builder = SegmentBuilder::new((*self.schema).clone());
582                for (field, tokenizer) in &self.tokenizers {
583                    builder.set_tokenizer(*field, tokenizer.clone_box());
584                }
585                builders.push(builder);
586            }
587        }
588
589        // Round-robin distribution
590        let idx = self
591            .next_builder
592            .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
593            % num_builders;
594        let builder = &mut builders[idx];
595        let doc_id = builder.add_document(doc)?;
596
597        // Check if this builder needs to be committed
598        if builder.num_docs() >= self.config.max_docs_per_segment {
599            // Take the full builder and replace with a new one
600            let full_builder = std::mem::replace(builder, {
601                let mut new_builder = SegmentBuilder::new((*self.schema).clone());
602                for (field, tokenizer) in &self.tokenizers {
603                    new_builder.set_tokenizer(*field, tokenizer.clone_box());
604                }
605                new_builder
606            });
607            drop(builders); // Release lock before async operation
608            self.commit_segment(full_builder).await?;
609        }
610
611        Ok(doc_id)
612    }
613
614    /// Commit all pending segments to disk
615    ///
616    /// Commits all internal segment builders in parallel.
617    pub async fn commit(&self) -> Result<()> {
618        let mut builders = self.segment_builders.lock().await;
619
620        if builders.is_empty() {
621            return Ok(());
622        }
623
624        // Take all builders and replace with empty vec
625        let builders_to_commit: Vec<SegmentBuilder> = std::mem::take(&mut *builders);
626        drop(builders); // Release lock before async operations
627
628        // Commit all non-empty builders in parallel
629        let mut handles = Vec::new();
630
631        for builder in builders_to_commit {
632            if builder.num_docs() == 0 {
633                continue;
634            }
635
636            let directory = Arc::clone(&self.directory);
637            let compression_threads = self.config.num_compression_threads;
638            let handle = tokio::spawn(async move {
639                let segment_id = SegmentId::new();
640                builder
641                    .build_with_threads(directory.as_ref(), segment_id, compression_threads)
642                    .await?;
643                Ok::<String, Error>(segment_id.to_hex())
644            });
645            handles.push(handle);
646        }
647
648        // Collect all new segment IDs
649        let mut new_segment_ids = Vec::new();
650        for handle in handles {
651            match handle.await {
652                Ok(Ok(id)) => new_segment_ids.push(id),
653                Ok(Err(e)) => return Err(e),
654                Err(e) => return Err(Error::Internal(format!("Task join error: {}", e))),
655            }
656        }
657
658        // Update segment list atomically
659        if !new_segment_ids.is_empty() {
660            let mut segment_ids = self.segment_ids.lock().await;
661            segment_ids.extend(new_segment_ids);
662
663            let segments_bytes = serde_json::to_vec(&*segment_ids)
664                .map_err(|e| Error::Serialization(e.to_string()))?;
665            self.directory
666                .write(Path::new("segments.json"), &segments_bytes)
667                .await?;
668        }
669
670        Ok(())
671    }
672
673    async fn commit_segment(&self, builder: SegmentBuilder) -> Result<()> {
674        if builder.num_docs() == 0 {
675            return Ok(());
676        }
677
678        let segment_id = SegmentId::new();
679        builder
680            .build_with_threads(
681                self.directory.as_ref(),
682                segment_id,
683                self.config.num_compression_threads,
684            )
685            .await?;
686
687        // Update segment list
688        let mut segment_ids = self.segment_ids.lock().await;
689        segment_ids.push(segment_id.to_hex());
690
691        let segments_bytes =
692            serde_json::to_vec(&*segment_ids).map_err(|e| Error::Serialization(e.to_string()))?;
693        self.directory
694            .write(Path::new("segments.json"), &segments_bytes)
695            .await?;
696
697        // Check if merge is needed
698        if segment_ids.len() >= self.config.merge_threshold {
699            drop(segment_ids); // Release lock
700            self.maybe_merge().await?;
701        }
702
703        Ok(())
704    }
705
706    /// Merge segments if threshold is reached
707    async fn maybe_merge(&self) -> Result<()> {
708        let segment_ids = self.segment_ids.lock().await;
709
710        if segment_ids.len() < self.config.merge_threshold {
711            return Ok(());
712        }
713
714        let ids_to_merge: Vec<String> = segment_ids.clone();
715        drop(segment_ids);
716
717        // Load segment readers
718        let mut readers = Vec::new();
719        let mut doc_offset = 0u32;
720
721        for id_str in &ids_to_merge {
722            let segment_id = SegmentId::from_hex(id_str)
723                .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
724            let reader = SegmentReader::open(
725                self.directory.as_ref(),
726                segment_id,
727                Arc::clone(&self.schema),
728                doc_offset,
729                self.config.term_cache_blocks,
730            )
731            .await?;
732            doc_offset += reader.meta().num_docs;
733            readers.push(reader);
734        }
735
736        // Merge into new segment
737        let merger = SegmentMerger::new(Arc::clone(&self.schema));
738        let new_segment_id = SegmentId::new();
739        merger
740            .merge(self.directory.as_ref(), &readers, new_segment_id)
741            .await?;
742
743        // Update segment list
744        let mut segment_ids = self.segment_ids.lock().await;
745        segment_ids.clear();
746        segment_ids.push(new_segment_id.to_hex());
747
748        let segments_bytes =
749            serde_json::to_vec(&*segment_ids).map_err(|e| Error::Serialization(e.to_string()))?;
750        self.directory
751            .write(Path::new("segments.json"), &segments_bytes)
752            .await?;
753
754        // Delete old segments
755        for id_str in ids_to_merge {
756            if let Some(segment_id) = SegmentId::from_hex(&id_str) {
757                let _ = crate::segment::delete_segment(self.directory.as_ref(), segment_id).await;
758            }
759        }
760
761        Ok(())
762    }
763
764    /// Force merge all segments into one
765    pub async fn force_merge(&self) -> Result<()> {
766        // First commit any pending documents
767        self.commit().await?;
768
769        let segment_ids = self.segment_ids.lock().await;
770        if segment_ids.len() <= 1 {
771            return Ok(());
772        }
773
774        let ids_to_merge: Vec<String> = segment_ids.clone();
775        drop(segment_ids);
776
777        // Load all segments
778        let mut readers = Vec::new();
779        let mut doc_offset = 0u32;
780
781        for id_str in &ids_to_merge {
782            let segment_id = SegmentId::from_hex(id_str)
783                .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
784            let reader = SegmentReader::open(
785                self.directory.as_ref(),
786                segment_id,
787                Arc::clone(&self.schema),
788                doc_offset,
789                self.config.term_cache_blocks,
790            )
791            .await?;
792            doc_offset += reader.meta().num_docs;
793            readers.push(reader);
794        }
795
796        // Merge
797        let merger = SegmentMerger::new(Arc::clone(&self.schema));
798        let new_segment_id = SegmentId::new();
799        merger
800            .merge(self.directory.as_ref(), &readers, new_segment_id)
801            .await?;
802
803        // Update segment list
804        let mut segment_ids = self.segment_ids.lock().await;
805        segment_ids.clear();
806        segment_ids.push(new_segment_id.to_hex());
807
808        let segments_bytes =
809            serde_json::to_vec(&*segment_ids).map_err(|e| Error::Serialization(e.to_string()))?;
810        self.directory
811            .write(Path::new("segments.json"), &segments_bytes)
812            .await?;
813
814        // Delete old segments
815        for id_str in ids_to_merge {
816            if let Some(segment_id) = SegmentId::from_hex(&id_str) {
817                let _ = crate::segment::delete_segment(self.directory.as_ref(), segment_id).await;
818            }
819        }
820
821        Ok(())
822    }
823}
824
825#[cfg(test)]
826mod tests {
827    use super::*;
828    use crate::directories::RamDirectory;
829    use crate::dsl::SchemaBuilder;
830
831    #[tokio::test]
832    async fn test_index_create_and_search() {
833        let mut schema_builder = SchemaBuilder::default();
834        let title = schema_builder.add_text_field("title", true, true);
835        let body = schema_builder.add_text_field("body", true, true);
836        let schema = schema_builder.build();
837
838        let dir = RamDirectory::new();
839        let config = IndexConfig::default();
840
841        // Create index and add documents
842        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
843            .await
844            .unwrap();
845
846        let mut doc1 = Document::new();
847        doc1.add_text(title, "Hello World");
848        doc1.add_text(body, "This is the first document");
849        writer.add_document(doc1).await.unwrap();
850
851        let mut doc2 = Document::new();
852        doc2.add_text(title, "Goodbye World");
853        doc2.add_text(body, "This is the second document");
854        writer.add_document(doc2).await.unwrap();
855
856        writer.commit().await.unwrap();
857
858        // Open for reading
859        let index = Index::open(dir, config).await.unwrap();
860        assert_eq!(index.num_docs(), 2);
861
862        // Check postings
863        let postings = index.get_postings(title, b"world").await.unwrap();
864        assert_eq!(postings.len(), 1); // One segment
865        assert_eq!(postings[0].1.doc_count(), 2); // Two docs with "world"
866
867        // Retrieve document
868        let doc = index.doc(0).await.unwrap().unwrap();
869        assert_eq!(doc.get_first(title).unwrap().as_text(), Some("Hello World"));
870    }
871
872    #[tokio::test]
873    async fn test_multiple_segments() {
874        let mut schema_builder = SchemaBuilder::default();
875        let title = schema_builder.add_text_field("title", true, true);
876        let schema = schema_builder.build();
877
878        let dir = RamDirectory::new();
879        let config = IndexConfig {
880            max_docs_per_segment: 5, // Small segments for testing
881            merge_threshold: 10,     // Don't auto-merge
882            ..Default::default()
883        };
884
885        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
886            .await
887            .unwrap();
888
889        // Add documents in batches to create multiple segments
890        for batch in 0..3 {
891            for i in 0..5 {
892                let mut doc = Document::new();
893                doc.add_text(title, format!("Document {} batch {}", i, batch));
894                writer.add_document(doc).await.unwrap();
895            }
896            writer.commit().await.unwrap();
897        }
898
899        // Open and check
900        let index = Index::open(dir, config).await.unwrap();
901        assert_eq!(index.num_docs(), 15);
902        assert_eq!(index.segment_readers().len(), 3);
903    }
904
905    #[tokio::test]
906    async fn test_segment_merge() {
907        let mut schema_builder = SchemaBuilder::default();
908        let title = schema_builder.add_text_field("title", true, true);
909        let schema = schema_builder.build();
910
911        let dir = RamDirectory::new();
912        let config = IndexConfig {
913            max_docs_per_segment: 3,
914            merge_threshold: 100, // Don't auto-merge
915            ..Default::default()
916        };
917
918        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
919            .await
920            .unwrap();
921
922        // Create multiple segments
923        for i in 0..9 {
924            let mut doc = Document::new();
925            doc.add_text(title, format!("Document {}", i));
926            writer.add_document(doc).await.unwrap();
927        }
928        writer.commit().await.unwrap();
929
930        // Should have 3 segments
931        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
932        assert_eq!(index.segment_readers().len(), 3);
933
934        // Force merge
935        let writer = IndexWriter::open(dir.clone(), config.clone())
936            .await
937            .unwrap();
938        writer.force_merge().await.unwrap();
939
940        // Should have 1 segment now
941        let index = Index::open(dir, config).await.unwrap();
942        assert_eq!(index.segment_readers().len(), 1);
943        assert_eq!(index.num_docs(), 9);
944
945        // Verify all documents accessible
946        for i in 0..9 {
947            let doc = index.doc(i).await.unwrap().unwrap();
948            assert_eq!(
949                doc.get_first(title).unwrap().as_text(),
950                Some(format!("Document {}", i).as_str())
951            );
952        }
953    }
954
955    #[tokio::test]
956    async fn test_match_query() {
957        let mut schema_builder = SchemaBuilder::default();
958        let title = schema_builder.add_text_field("title", true, true);
959        let body = schema_builder.add_text_field("body", true, true);
960        let schema = schema_builder.build();
961
962        let dir = RamDirectory::new();
963        let config = IndexConfig::default();
964
965        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
966            .await
967            .unwrap();
968
969        let mut doc1 = Document::new();
970        doc1.add_text(title, "rust programming");
971        doc1.add_text(body, "Learn rust language");
972        writer.add_document(doc1).await.unwrap();
973
974        let mut doc2 = Document::new();
975        doc2.add_text(title, "python programming");
976        doc2.add_text(body, "Learn python language");
977        writer.add_document(doc2).await.unwrap();
978
979        writer.commit().await.unwrap();
980
981        let index = Index::open(dir, config).await.unwrap();
982
983        // Test match query with multiple default fields
984        let results = index.query("rust", 10).await.unwrap();
985        assert_eq!(results.hits.len(), 1);
986
987        // Test match query with multiple tokens
988        let results = index.query("rust programming", 10).await.unwrap();
989        assert!(!results.hits.is_empty());
990
991        // Verify hit has address (segment_id + doc_id)
992        let hit = &results.hits[0];
993        assert!(!hit.address.segment_id.is_empty(), "Should have segment_id");
994
995        // Verify document retrieval by address
996        let doc = index.get_document(&hit.address).await.unwrap().unwrap();
997        assert!(
998            !doc.field_values().is_empty(),
999            "Doc should have field values"
1000        );
1001
1002        // Also verify doc retrieval directly by global doc_id
1003        let doc = index.doc(0).await.unwrap().unwrap();
1004        assert!(
1005            !doc.field_values().is_empty(),
1006            "Doc should have field values"
1007        );
1008    }
1009
1010    #[tokio::test]
1011    async fn test_slice_cache_warmup_and_load() {
1012        use crate::directories::SliceCachingDirectory;
1013
1014        let mut schema_builder = SchemaBuilder::default();
1015        let title = schema_builder.add_text_field("title", true, true);
1016        let body = schema_builder.add_text_field("body", true, true);
1017        let schema = schema_builder.build();
1018
1019        let dir = RamDirectory::new();
1020        let config = IndexConfig::default();
1021
1022        // Create index with some documents
1023        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1024            .await
1025            .unwrap();
1026
1027        for i in 0..10 {
1028            let mut doc = Document::new();
1029            doc.add_text(title, format!("Document {} about rust", i));
1030            doc.add_text(body, format!("This is body text number {}", i));
1031            writer.add_document(doc).await.unwrap();
1032        }
1033        writer.commit().await.unwrap();
1034
1035        // Open with slice caching and perform some operations to warm up cache
1036        let caching_dir = SliceCachingDirectory::new(dir.clone(), 1024 * 1024);
1037        let index = Index::open(caching_dir, config.clone()).await.unwrap();
1038
1039        // Perform a search to warm up the cache
1040        let results = index.query("rust", 10).await.unwrap();
1041        assert!(!results.hits.is_empty());
1042
1043        // Check cache stats - should have cached some data
1044        let stats = index.slice_cache_stats();
1045        assert!(stats.total_bytes > 0, "Cache should have data after search");
1046
1047        // Save the cache
1048        index.save_slice_cache().await.unwrap();
1049
1050        // Verify cache file was written
1051        assert!(
1052            dir.exists(Path::new(super::SLICE_CACHE_FILENAME))
1053                .await
1054                .unwrap()
1055        );
1056
1057        // Now open with cache loading
1058        let index2 = Index::open_with_cache(dir.clone(), config.clone(), 1024 * 1024)
1059            .await
1060            .unwrap();
1061
1062        // Cache should be prefilled
1063        let stats2 = index2.slice_cache_stats();
1064        assert!(
1065            stats2.total_bytes > 0,
1066            "Cache should be prefilled from file"
1067        );
1068
1069        // Search should still work
1070        let results2 = index2.query("rust", 10).await.unwrap();
1071        assert_eq!(results.hits.len(), results2.hits.len());
1072    }
1073
1074    #[tokio::test]
1075    async fn test_multivalue_field_indexing_and_search() {
1076        let mut schema_builder = SchemaBuilder::default();
1077        let uris = schema_builder.add_text_field("uris", true, true);
1078        let title = schema_builder.add_text_field("title", true, true);
1079        let schema = schema_builder.build();
1080
1081        let dir = RamDirectory::new();
1082        let config = IndexConfig::default();
1083
1084        // Create index and add document with multi-value field
1085        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1086            .await
1087            .unwrap();
1088
1089        let mut doc = Document::new();
1090        doc.add_text(uris, "one");
1091        doc.add_text(uris, "two");
1092        doc.add_text(title, "Test Document");
1093        writer.add_document(doc).await.unwrap();
1094
1095        // Add another document with different uris
1096        let mut doc2 = Document::new();
1097        doc2.add_text(uris, "three");
1098        doc2.add_text(title, "Another Document");
1099        writer.add_document(doc2).await.unwrap();
1100
1101        writer.commit().await.unwrap();
1102
1103        // Open for reading
1104        let index = Index::open(dir, config).await.unwrap();
1105        assert_eq!(index.num_docs(), 2);
1106
1107        // Verify document retrieval preserves all values
1108        let doc = index.doc(0).await.unwrap().unwrap();
1109        let all_uris: Vec<_> = doc.get_all(uris).collect();
1110        assert_eq!(all_uris.len(), 2, "Should have 2 uris values");
1111        assert_eq!(all_uris[0].as_text(), Some("one"));
1112        assert_eq!(all_uris[1].as_text(), Some("two"));
1113
1114        // Verify to_json returns array for multi-value field
1115        let json = doc.to_json(index.schema());
1116        let uris_json = json.get("uris").unwrap();
1117        assert!(uris_json.is_array(), "Multi-value field should be an array");
1118        let uris_arr = uris_json.as_array().unwrap();
1119        assert_eq!(uris_arr.len(), 2);
1120        assert_eq!(uris_arr[0].as_str(), Some("one"));
1121        assert_eq!(uris_arr[1].as_str(), Some("two"));
1122
1123        // Verify both values are searchable
1124        let results = index.query("uris:one", 10).await.unwrap();
1125        assert_eq!(results.hits.len(), 1, "Should find doc with 'one'");
1126        assert_eq!(results.hits[0].address.doc_id, 0);
1127
1128        let results = index.query("uris:two", 10).await.unwrap();
1129        assert_eq!(results.hits.len(), 1, "Should find doc with 'two'");
1130        assert_eq!(results.hits[0].address.doc_id, 0);
1131
1132        let results = index.query("uris:three", 10).await.unwrap();
1133        assert_eq!(results.hits.len(), 1, "Should find doc with 'three'");
1134        assert_eq!(results.hits[0].address.doc_id, 1);
1135
1136        // Verify searching for non-existent value returns no results
1137        let results = index.query("uris:nonexistent", 10).await.unwrap();
1138        assert_eq!(results.hits.len(), 0, "Should not find non-existent value");
1139    }
1140}