hermes_core/
index.rs

1//! Index - multi-segment async search index
2//!
3//! Components:
4//! - Index: main entry point for searching
5//! - IndexWriter: for adding documents and committing segments
6//! - Supports multiple segments with merge
7
8use std::path::Path;
9use std::sync::Arc;
10
11use parking_lot::RwLock;
12#[cfg(feature = "native")]
13use rustc_hash::FxHashMap;
14
15use crate::DocId;
16#[cfg(feature = "native")]
17use crate::directories::DirectoryWriter;
18use crate::directories::{Directory, SliceCachingDirectory};
19use crate::dsl::{Document, Field, Schema};
20use crate::error::{Error, Result};
21#[cfg(feature = "native")]
22use crate::segment::{SegmentBuilder, SegmentBuilderConfig, SegmentMerger};
23use crate::segment::{SegmentId, SegmentReader};
24use crate::structures::BlockPostingList;
25#[cfg(feature = "native")]
26use crate::tokenizer::BoxedTokenizer;
27
28#[cfg(feature = "native")]
29use rand::Rng;
30#[cfg(feature = "native")]
31use tokio::sync::Mutex as AsyncMutex;
32#[cfg(feature = "native")]
33use tokio::task::JoinHandle;
34
35/// Default file name for the slice cache
36pub const SLICE_CACHE_FILENAME: &str = "index.slicecache";
37
38/// Index configuration
39#[derive(Debug, Clone)]
40pub struct IndexConfig {
41    /// Number of threads for CPU-intensive tasks (search parallelism)
42    pub num_threads: usize,
43    /// Number of parallel segment builders (documents distributed round-robin)
44    pub num_indexing_threads: usize,
45    /// Number of threads for parallel block compression within each segment
46    pub num_compression_threads: usize,
47    /// Block cache size for term dictionary per segment
48    pub term_cache_blocks: usize,
49    /// Block cache size for document store per segment
50    pub store_cache_blocks: usize,
51    /// Max documents per segment before auto-commit
52    pub max_docs_per_segment: u32,
53    /// Index optimization mode (adaptive, size-optimized, performance-optimized)
54    pub optimization: crate::structures::IndexOptimization,
55}
56
57impl Default for IndexConfig {
58    fn default() -> Self {
59        #[cfg(feature = "native")]
60        let cpus = num_cpus::get().max(1);
61        #[cfg(not(feature = "native"))]
62        let cpus = 1;
63
64        Self {
65            num_threads: cpus,
66            num_indexing_threads: 1,
67            num_compression_threads: cpus,
68            term_cache_blocks: 256,
69            store_cache_blocks: 32,
70            max_docs_per_segment: 100_000,
71            optimization: crate::structures::IndexOptimization::default(),
72        }
73    }
74}
75
76/// Multi-segment async Index
77///
78/// The main entry point for searching. Manages multiple segments
79/// and provides unified search across all of them.
80pub struct Index<D: Directory> {
81    directory: Arc<D>,
82    schema: Arc<Schema>,
83    config: IndexConfig,
84    segments: RwLock<Vec<Arc<SegmentReader>>>,
85    default_fields: Vec<crate::Field>,
86    tokenizers: Arc<crate::tokenizer::TokenizerRegistry>,
87    #[cfg(feature = "native")]
88    thread_pool: Arc<rayon::ThreadPool>,
89}
90
91impl<D: Directory> Index<D> {
92    /// Open an existing index from a directory
93    pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
94        let directory = Arc::new(directory);
95
96        // Read schema
97        let schema_slice = directory.open_read(Path::new("schema.json")).await?;
98        let schema_bytes = schema_slice.read_bytes().await?;
99        let schema: Schema = serde_json::from_slice(schema_bytes.as_slice())
100            .map_err(|e| Error::Serialization(e.to_string()))?;
101        let schema = Arc::new(schema);
102
103        // Read segment list
104        let segments = Self::load_segments(&directory, &schema, &config).await?;
105
106        #[cfg(feature = "native")]
107        let thread_pool = {
108            let pool = rayon::ThreadPoolBuilder::new()
109                .num_threads(config.num_threads)
110                .build()
111                .map_err(|e| Error::Io(std::io::Error::other(e)))?;
112            Arc::new(pool)
113        };
114
115        // Use schema's default_fields if specified, otherwise fall back to all indexed text fields
116        let default_fields: Vec<crate::Field> = if !schema.default_fields().is_empty() {
117            schema.default_fields().to_vec()
118        } else {
119            schema
120                .fields()
121                .filter(|(_, entry)| {
122                    entry.indexed && entry.field_type == crate::dsl::FieldType::Text
123                })
124                .map(|(field, _)| field)
125                .collect()
126        };
127
128        Ok(Self {
129            directory,
130            schema,
131            config,
132            segments: RwLock::new(segments),
133            default_fields,
134            tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
135            #[cfg(feature = "native")]
136            thread_pool,
137        })
138    }
139
140    async fn load_segments(
141        directory: &Arc<D>,
142        schema: &Arc<Schema>,
143        config: &IndexConfig,
144    ) -> Result<Vec<Arc<SegmentReader>>> {
145        // Read segments.json which lists all segment IDs
146        let segments_path = Path::new("segments.json");
147        if !directory.exists(segments_path).await? {
148            return Ok(Vec::new());
149        }
150
151        let segments_slice = directory.open_read(segments_path).await?;
152        let segments_bytes = segments_slice.read_bytes().await?;
153        let segment_ids: Vec<String> = serde_json::from_slice(segments_bytes.as_slice())
154            .map_err(|e| Error::Serialization(e.to_string()))?;
155
156        let mut segments = Vec::new();
157        let mut doc_id_offset = 0u32;
158
159        for id_str in segment_ids {
160            let segment_id = SegmentId::from_hex(&id_str)
161                .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
162            let reader = SegmentReader::open(
163                directory.as_ref(),
164                segment_id,
165                Arc::clone(schema),
166                doc_id_offset,
167                config.term_cache_blocks,
168            )
169            .await?;
170
171            doc_id_offset += reader.meta().num_docs;
172            segments.push(Arc::new(reader));
173        }
174
175        Ok(segments)
176    }
177
178    /// Get the schema
179    pub fn schema(&self) -> &Schema {
180        &self.schema
181    }
182
183    /// Get a reference to the underlying directory
184    pub fn directory(&self) -> &D {
185        &self.directory
186    }
187
188    /// Total number of documents across all segments
189    pub fn num_docs(&self) -> u32 {
190        self.segments.read().iter().map(|s| s.num_docs()).sum()
191    }
192
193    /// Get a document by global doc_id (async)
194    pub async fn doc(&self, doc_id: DocId) -> Result<Option<Document>> {
195        let segments = self.segments.read().clone();
196
197        let mut offset = 0u32;
198        for segment in segments.iter() {
199            let segment_docs = segment.meta().num_docs;
200            if doc_id < offset + segment_docs {
201                let local_doc_id = doc_id - offset;
202                return segment.doc(local_doc_id).await;
203            }
204            offset += segment_docs;
205        }
206
207        Ok(None)
208    }
209
210    /// Get posting lists for a term across all segments (async)
211    pub async fn get_postings(
212        &self,
213        field: Field,
214        term: &[u8],
215    ) -> Result<Vec<(Arc<SegmentReader>, BlockPostingList)>> {
216        let segments = self.segments.read().clone();
217        let mut results = Vec::new();
218
219        for segment in segments.iter() {
220            if let Some(postings) = segment.get_postings(field, term).await? {
221                results.push((Arc::clone(segment), postings));
222            }
223        }
224
225        Ok(results)
226    }
227
228    /// Execute CPU-intensive work on thread pool (native only)
229    #[cfg(feature = "native")]
230    pub async fn spawn_blocking<F, R>(&self, f: F) -> R
231    where
232        F: FnOnce() -> R + Send + 'static,
233        R: Send + 'static,
234    {
235        let (tx, rx) = tokio::sync::oneshot::channel();
236        self.thread_pool.spawn(move || {
237            let result = f();
238            let _ = tx.send(result);
239        });
240        rx.await.expect("Thread pool task panicked")
241    }
242
243    /// Get segment readers for query execution
244    pub fn segment_readers(&self) -> Vec<Arc<SegmentReader>> {
245        self.segments.read().clone()
246    }
247
248    /// Reload segments from directory (after new segments added)
249    pub async fn reload(&self) -> Result<()> {
250        let new_segments = Self::load_segments(&self.directory, &self.schema, &self.config).await?;
251        *self.segments.write() = new_segments;
252        Ok(())
253    }
254
255    /// Search across all segments
256    pub async fn search(
257        &self,
258        query: &dyn crate::query::Query,
259        limit: usize,
260    ) -> Result<Vec<crate::query::SearchResult>> {
261        let segments = self.segments.read().clone();
262        let mut all_results = Vec::new();
263
264        for segment in &segments {
265            let results = crate::query::search_segment(segment.as_ref(), query, limit).await?;
266            all_results.extend(results);
267        }
268
269        // Sort by score descending
270        all_results.sort_by(|a, b| {
271            b.score
272                .partial_cmp(&a.score)
273                .unwrap_or(std::cmp::Ordering::Equal)
274        });
275        all_results.truncate(limit);
276
277        Ok(all_results)
278    }
279
280    /// Search and return results with document addresses (no document content)
281    pub async fn search_with_addresses(
282        &self,
283        query: &dyn crate::query::Query,
284        limit: usize,
285    ) -> Result<crate::query::SearchResponse> {
286        let segments = self.segments.read().clone();
287        let mut all_results: Vec<(u128, crate::query::SearchResult)> = Vec::new();
288
289        for segment in &segments {
290            let segment_id = segment.meta().id;
291            let results = crate::query::search_segment(segment.as_ref(), query, limit).await?;
292            for result in results {
293                all_results.push((segment_id, result));
294            }
295        }
296
297        // Sort by score descending
298        all_results.sort_by(|a, b| {
299            b.1.score
300                .partial_cmp(&a.1.score)
301                .unwrap_or(std::cmp::Ordering::Equal)
302        });
303        all_results.truncate(limit);
304
305        let total_hits = all_results.len() as u32;
306        let hits: Vec<crate::query::SearchHit> = all_results
307            .into_iter()
308            .map(|(segment_id, result)| crate::query::SearchHit {
309                address: crate::query::DocAddress::new(segment_id, result.doc_id),
310                score: result.score,
311            })
312            .collect();
313
314        Ok(crate::query::SearchResponse { hits, total_hits })
315    }
316
317    /// Get a document by its unique address (segment_id + local doc_id)
318    pub async fn get_document(
319        &self,
320        address: &crate::query::DocAddress,
321    ) -> Result<Option<Document>> {
322        let segment_id = address
323            .segment_id_u128()
324            .ok_or_else(|| Error::Query(format!("Invalid segment ID: {}", address.segment_id)))?;
325
326        let segments = self.segments.read().clone();
327        for segment in &segments {
328            if segment.meta().id == segment_id {
329                return segment.doc(address.doc_id).await;
330            }
331        }
332
333        Ok(None)
334    }
335
336    /// Get the default fields for this index
337    pub fn default_fields(&self) -> &[crate::Field] {
338        &self.default_fields
339    }
340
341    /// Set the default fields for query parsing
342    pub fn set_default_fields(&mut self, fields: Vec<crate::Field>) {
343        self.default_fields = fields;
344    }
345
346    /// Get the tokenizer registry
347    pub fn tokenizers(&self) -> &Arc<crate::tokenizer::TokenizerRegistry> {
348        &self.tokenizers
349    }
350
351    /// Create a query parser for this index
352    ///
353    /// If the schema contains query router rules, they will be used to route
354    /// queries to specific fields based on regex patterns.
355    pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
356        // Check if schema has query routers
357        let query_routers = self.schema.query_routers();
358        if !query_routers.is_empty() {
359            // Try to create a router from the schema's rules
360            if let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers) {
361                return crate::dsl::QueryLanguageParser::with_router(
362                    Arc::clone(&self.schema),
363                    self.default_fields.clone(),
364                    Arc::clone(&self.tokenizers),
365                    router,
366                );
367            }
368        }
369
370        // Fall back to parser without router
371        crate::dsl::QueryLanguageParser::new(
372            Arc::clone(&self.schema),
373            self.default_fields.clone(),
374            Arc::clone(&self.tokenizers),
375        )
376    }
377
378    /// Parse and search using a query string
379    ///
380    /// Accepts both query language syntax (field:term, AND, OR, NOT, grouping)
381    /// and simple text (tokenized and searched across default fields).
382    /// Returns document addresses (segment_id + doc_id) without document content.
383    pub async fn query(
384        &self,
385        query_str: &str,
386        limit: usize,
387    ) -> Result<crate::query::SearchResponse> {
388        let parser = self.query_parser();
389        let query = parser.parse(query_str).map_err(Error::Query)?;
390        self.search_with_addresses(query.as_ref(), limit).await
391    }
392}
393
394/// Methods for opening index with slice caching
395impl<D: Directory> Index<SliceCachingDirectory<D>> {
396    /// Open an index with slice caching, automatically loading the cache file if present
397    ///
398    /// This wraps the directory in a SliceCachingDirectory and attempts to load
399    /// any existing slice cache file to prefill the cache with hot data.
400    pub async fn open_with_cache(
401        directory: D,
402        config: IndexConfig,
403        cache_max_bytes: usize,
404    ) -> Result<Self> {
405        let caching_dir = SliceCachingDirectory::new(directory, cache_max_bytes);
406
407        // Try to load existing slice cache
408        let cache_path = Path::new(SLICE_CACHE_FILENAME);
409        if let Ok(true) = caching_dir.inner().exists(cache_path).await
410            && let Ok(slice) = caching_dir.inner().open_read(cache_path).await
411            && let Ok(bytes) = slice.read_bytes().await
412        {
413            let _ = caching_dir.deserialize(bytes.as_slice());
414        }
415
416        Self::open(caching_dir, config).await
417    }
418
419    /// Serialize the current slice cache to the index directory
420    ///
421    /// This saves all cached slices to a single file that can be loaded
422    /// on subsequent index opens for faster startup.
423    #[cfg(feature = "native")]
424    pub async fn save_slice_cache(&self) -> Result<()>
425    where
426        D: DirectoryWriter,
427    {
428        let cache_data = self.directory.serialize();
429        let cache_path = Path::new(SLICE_CACHE_FILENAME);
430        self.directory
431            .inner()
432            .write(cache_path, &cache_data)
433            .await?;
434        Ok(())
435    }
436
437    /// Get slice cache statistics
438    pub fn slice_cache_stats(&self) -> crate::directories::SliceCacheStats {
439        self.directory.stats()
440    }
441}
442
443/// Warm up the slice cache by opening an index and performing typical read operations
444///
445/// This function opens an index using a SliceCachingDirectory, performs operations
446/// that would typically be done during search (reading term dictionaries, posting lists),
447/// and then serializes the cache to a file for future use.
448///
449/// The resulting cache file contains all the "hot" data that was read during warmup,
450/// allowing subsequent index opens to prefill the cache and avoid cold-start latency.
451#[cfg(feature = "native")]
452pub async fn warmup_and_save_slice_cache<D: DirectoryWriter>(
453    directory: D,
454    config: IndexConfig,
455    cache_max_bytes: usize,
456) -> Result<()> {
457    let caching_dir = SliceCachingDirectory::new(directory, cache_max_bytes);
458    let index = Index::open(caching_dir, config).await?;
459
460    // Warm up by loading segment metadata and term dictionaries
461    // The SegmentReader::open already reads essential metadata
462    // Additional warmup can be done by iterating terms or doing sample queries
463
464    // Save the cache
465    index.save_slice_cache().await?;
466
467    Ok(())
468}
469
470#[cfg(feature = "native")]
471impl<D: Directory> Clone for Index<D> {
472    fn clone(&self) -> Self {
473        Self {
474            directory: Arc::clone(&self.directory),
475            schema: Arc::clone(&self.schema),
476            config: self.config.clone(),
477            segments: RwLock::new(self.segments.read().clone()),
478            default_fields: self.default_fields.clone(),
479            tokenizers: Arc::clone(&self.tokenizers),
480            thread_pool: Arc::clone(&self.thread_pool),
481        }
482    }
483}
484
485/// Async IndexWriter for adding documents and committing segments
486///
487/// Features:
488/// - Parallel indexing with multiple segment builders
489/// - Streams documents to disk immediately (no in-memory document storage)
490/// - Uses string interning for terms (reduced allocations)
491/// - Uses hashbrown HashMap (faster than BTreeMap)
492#[cfg(feature = "native")]
493pub struct IndexWriter<D: DirectoryWriter> {
494    directory: Arc<D>,
495    schema: Arc<Schema>,
496    config: IndexConfig,
497    builder_config: SegmentBuilderConfig,
498    tokenizers: FxHashMap<Field, BoxedTokenizer>,
499    /// Multiple segment builders for parallel indexing
500    builders: Vec<AsyncMutex<Option<SegmentBuilder>>>,
501    /// List of committed segment IDs (hex strings)
502    segment_ids: AsyncMutex<Vec<String>>,
503    /// Background segment build tasks - allows document ingestion to continue while building
504    background_builds: AsyncMutex<Vec<JoinHandle<Result<String>>>>,
505}
506
507#[cfg(feature = "native")]
508impl<D: DirectoryWriter> IndexWriter<D> {
509    /// Create a new index in the directory
510    pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
511        Self::create_with_config(directory, schema, config, SegmentBuilderConfig::default()).await
512    }
513
514    /// Create a new index with custom builder config
515    pub async fn create_with_config(
516        directory: D,
517        schema: Schema,
518        config: IndexConfig,
519        builder_config: SegmentBuilderConfig,
520    ) -> Result<Self> {
521        let directory = Arc::new(directory);
522        let schema = Arc::new(schema);
523
524        // Write schema
525        let schema_bytes =
526            serde_json::to_vec(&*schema).map_err(|e| Error::Serialization(e.to_string()))?;
527        directory
528            .write(Path::new("schema.json"), &schema_bytes)
529            .await?;
530
531        // Write empty segments list
532        let segments_bytes = serde_json::to_vec(&Vec::<String>::new())
533            .map_err(|e| Error::Serialization(e.to_string()))?;
534        directory
535            .write(Path::new("segments.json"), &segments_bytes)
536            .await?;
537
538        // Create multiple builders for parallel indexing
539        let num_builders = config.num_indexing_threads.max(1);
540        let mut builders = Vec::with_capacity(num_builders);
541        for _ in 0..num_builders {
542            builders.push(AsyncMutex::new(None));
543        }
544
545        Ok(Self {
546            directory,
547            schema,
548            config,
549            builder_config,
550            tokenizers: FxHashMap::default(),
551            builders,
552            segment_ids: AsyncMutex::new(Vec::new()),
553            background_builds: AsyncMutex::new(Vec::new()),
554        })
555    }
556
557    /// Open an existing index for writing
558    pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
559        Self::open_with_config(directory, config, SegmentBuilderConfig::default()).await
560    }
561
562    /// Open an existing index with custom builder config
563    pub async fn open_with_config(
564        directory: D,
565        config: IndexConfig,
566        builder_config: SegmentBuilderConfig,
567    ) -> Result<Self> {
568        let directory = Arc::new(directory);
569
570        // Read schema
571        let schema_slice = directory.open_read(Path::new("schema.json")).await?;
572        let schema_bytes = schema_slice.read_bytes().await?;
573        let schema: Schema = serde_json::from_slice(schema_bytes.as_slice())
574            .map_err(|e| Error::Serialization(e.to_string()))?;
575        let schema = Arc::new(schema);
576
577        // Read existing segment IDs (hex strings)
578        let segments_slice = directory.open_read(Path::new("segments.json")).await?;
579        let segments_bytes = segments_slice.read_bytes().await?;
580        let segment_ids: Vec<String> = serde_json::from_slice(segments_bytes.as_slice())
581            .map_err(|e| Error::Serialization(e.to_string()))?;
582
583        // Create multiple builders for parallel indexing
584        let num_builders = config.num_indexing_threads.max(1);
585        let mut builders = Vec::with_capacity(num_builders);
586        for _ in 0..num_builders {
587            builders.push(AsyncMutex::new(None));
588        }
589
590        Ok(Self {
591            directory,
592            schema,
593            config,
594            builder_config,
595            tokenizers: FxHashMap::default(),
596            builders,
597            segment_ids: AsyncMutex::new(segment_ids),
598            background_builds: AsyncMutex::new(Vec::new()),
599        })
600    }
601
602    /// Get the schema
603    pub fn schema(&self) -> &Schema {
604        &self.schema
605    }
606
607    /// Set tokenizer for a field
608    pub fn set_tokenizer<T: crate::tokenizer::Tokenizer>(&mut self, field: Field, tokenizer: T) {
609        self.tokenizers.insert(field, Box::new(tokenizer));
610    }
611
612    /// Add a document
613    ///
614    /// Documents are distributed randomly across multiple builders for parallel indexing.
615    /// Random distribution avoids atomic contention and provides better load balancing.
616    /// When a builder reaches `max_docs_per_segment`, it is committed and a new one starts.
617    pub async fn add_document(&self, doc: Document) -> Result<DocId> {
618        // Random selection of builder - avoids atomic contention
619        let builder_idx = rand::rng().random_range(0..self.builders.len());
620
621        let mut builder_guard = self.builders[builder_idx].lock().await;
622
623        // Initialize builder if needed
624        if builder_guard.is_none() {
625            let mut builder =
626                SegmentBuilder::new((*self.schema).clone(), self.builder_config.clone())?;
627            for (field, tokenizer) in &self.tokenizers {
628                builder.set_tokenizer(*field, tokenizer.clone_box());
629            }
630            *builder_guard = Some(builder);
631        }
632
633        let builder = builder_guard.as_mut().unwrap();
634        let doc_id = builder.add_document(doc)?;
635
636        // Check if we need to commit
637        if builder.num_docs() >= self.config.max_docs_per_segment {
638            let full_builder = builder_guard.take().unwrap();
639            drop(builder_guard); // Release lock before spawning background task
640            self.spawn_background_build(full_builder).await;
641        }
642
643        Ok(doc_id)
644    }
645
646    /// Spawn a background task to build a segment without blocking document ingestion
647    async fn spawn_background_build(&self, builder: SegmentBuilder) {
648        let directory = Arc::clone(&self.directory);
649        let segment_id = SegmentId::new();
650        let segment_hex = segment_id.to_hex();
651
652        let handle = tokio::spawn(async move {
653            builder.build(directory.as_ref(), segment_id).await?;
654            Ok(segment_hex)
655        });
656
657        let mut builds = self.background_builds.lock().await;
658        builds.push(handle);
659    }
660
661    /// Wait for all background builds to complete
662    async fn wait_for_background_builds(&self) -> Result<()> {
663        let mut builds = self.background_builds.lock().await;
664        let mut completed_ids = Vec::new();
665
666        for handle in builds.drain(..) {
667            match handle.await {
668                Ok(Ok(segment_hex)) => completed_ids.push(segment_hex),
669                Ok(Err(e)) => return Err(e),
670                Err(e) => return Err(Error::Io(std::io::Error::other(e))),
671            }
672        }
673        drop(builds);
674
675        // Register completed segment IDs
676        if !completed_ids.is_empty() {
677            let mut segment_ids = self.segment_ids.lock().await;
678            segment_ids.extend(completed_ids);
679
680            let segments_bytes = serde_json::to_vec(&*segment_ids)
681                .map_err(|e| Error::Serialization(e.to_string()))?;
682            self.directory
683                .write(Path::new("segments.json"), &segments_bytes)
684                .await?;
685        }
686
687        Ok(())
688    }
689
690    /// Get current builder statistics for debugging (aggregated from all builders)
691    pub async fn get_builder_stats(&self) -> Option<crate::segment::SegmentBuilderStats> {
692        let mut total_stats: Option<crate::segment::SegmentBuilderStats> = None;
693
694        for builder_mutex in &self.builders {
695            let guard = builder_mutex.lock().await;
696            if let Some(builder) = guard.as_ref() {
697                let stats = builder.stats();
698                if let Some(ref mut total) = total_stats {
699                    total.num_docs += stats.num_docs;
700                    total.unique_terms += stats.unique_terms;
701                    total.postings_in_memory += stats.postings_in_memory;
702                    total.interned_strings += stats.interned_strings;
703                    total.doc_field_lengths_size += stats.doc_field_lengths_size;
704                } else {
705                    total_stats = Some(stats);
706                }
707            }
708        }
709
710        total_stats
711    }
712
713    /// Commit all pending segments to disk
714    ///
715    /// This waits for all background builds to complete, then commits any remaining
716    /// in-progress builders.
717    pub async fn commit(&self) -> Result<()> {
718        // First, wait for all background builds to complete
719        self.wait_for_background_builds().await?;
720
721        // Collect all builders that have documents
722        let mut builders_to_commit = Vec::new();
723
724        for builder_mutex in &self.builders {
725            let mut guard = builder_mutex.lock().await;
726            if let Some(builder) = guard.take()
727                && builder.num_docs() > 0
728            {
729                builders_to_commit.push(builder);
730            }
731        }
732
733        // Build remaining segments in parallel using tokio tasks
734        let mut handles = Vec::new();
735        for builder in builders_to_commit {
736            let directory = Arc::clone(&self.directory);
737            let segment_id = SegmentId::new();
738            let segment_hex = segment_id.to_hex();
739
740            let handle = tokio::spawn(async move {
741                builder.build(directory.as_ref(), segment_id).await?;
742                Ok::<_, Error>(segment_hex)
743            });
744            handles.push(handle);
745        }
746
747        // Wait for all and collect segment IDs
748        let mut new_segment_ids = Vec::new();
749        for handle in handles {
750            match handle.await {
751                Ok(Ok(segment_hex)) => new_segment_ids.push(segment_hex),
752                Ok(Err(e)) => return Err(e),
753                Err(e) => return Err(Error::Io(std::io::Error::other(e))),
754            }
755        }
756
757        // Update segment list
758        if !new_segment_ids.is_empty() {
759            let mut segment_ids = self.segment_ids.lock().await;
760            segment_ids.extend(new_segment_ids);
761
762            let segments_bytes = serde_json::to_vec(&*segment_ids)
763                .map_err(|e| Error::Serialization(e.to_string()))?;
764            self.directory
765                .write(Path::new("segments.json"), &segments_bytes)
766                .await?;
767        }
768
769        Ok(())
770    }
771
772    /// Merge all segments into one (called explicitly via force_merge)
773    async fn do_merge(&self) -> Result<()> {
774        let segment_ids = self.segment_ids.lock().await;
775
776        if segment_ids.len() < 2 {
777            return Ok(());
778        }
779
780        let ids_to_merge: Vec<String> = segment_ids.clone();
781        drop(segment_ids);
782
783        // Load segment readers
784        let mut readers = Vec::new();
785        let mut doc_offset = 0u32;
786
787        for id_str in &ids_to_merge {
788            let segment_id = SegmentId::from_hex(id_str)
789                .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
790            let reader = SegmentReader::open(
791                self.directory.as_ref(),
792                segment_id,
793                Arc::clone(&self.schema),
794                doc_offset,
795                self.config.term_cache_blocks,
796            )
797            .await?;
798            doc_offset += reader.meta().num_docs;
799            readers.push(reader);
800        }
801
802        // Merge into new segment
803        let merger = SegmentMerger::new(Arc::clone(&self.schema));
804        let new_segment_id = SegmentId::new();
805        merger
806            .merge(self.directory.as_ref(), &readers, new_segment_id)
807            .await?;
808
809        // Update segment list
810        let mut segment_ids = self.segment_ids.lock().await;
811        segment_ids.clear();
812        segment_ids.push(new_segment_id.to_hex());
813
814        let segments_bytes =
815            serde_json::to_vec(&*segment_ids).map_err(|e| Error::Serialization(e.to_string()))?;
816        self.directory
817            .write(Path::new("segments.json"), &segments_bytes)
818            .await?;
819
820        // Delete old segments
821        for id_str in ids_to_merge {
822            if let Some(segment_id) = SegmentId::from_hex(&id_str) {
823                let _ = crate::segment::delete_segment(self.directory.as_ref(), segment_id).await;
824            }
825        }
826
827        Ok(())
828    }
829
830    /// Force merge all segments into one
831    pub async fn force_merge(&self) -> Result<()> {
832        // First commit any pending documents
833        self.commit().await?;
834        // Then merge all segments
835        self.do_merge().await
836    }
837}
838
839#[cfg(test)]
840mod tests {
841    use super::*;
842    use crate::directories::RamDirectory;
843    use crate::dsl::SchemaBuilder;
844
845    #[tokio::test]
846    async fn test_index_create_and_search() {
847        let mut schema_builder = SchemaBuilder::default();
848        let title = schema_builder.add_text_field("title", true, true);
849        let body = schema_builder.add_text_field("body", true, true);
850        let schema = schema_builder.build();
851
852        let dir = RamDirectory::new();
853        let config = IndexConfig::default();
854
855        // Create index and add documents
856        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
857            .await
858            .unwrap();
859
860        let mut doc1 = Document::new();
861        doc1.add_text(title, "Hello World");
862        doc1.add_text(body, "This is the first document");
863        writer.add_document(doc1).await.unwrap();
864
865        let mut doc2 = Document::new();
866        doc2.add_text(title, "Goodbye World");
867        doc2.add_text(body, "This is the second document");
868        writer.add_document(doc2).await.unwrap();
869
870        writer.commit().await.unwrap();
871
872        // Open for reading
873        let index = Index::open(dir, config).await.unwrap();
874        assert_eq!(index.num_docs(), 2);
875
876        // Check postings
877        let postings = index.get_postings(title, b"world").await.unwrap();
878        assert_eq!(postings.len(), 1); // One segment
879        assert_eq!(postings[0].1.doc_count(), 2); // Two docs with "world"
880
881        // Retrieve document
882        let doc = index.doc(0).await.unwrap().unwrap();
883        assert_eq!(doc.get_first(title).unwrap().as_text(), Some("Hello World"));
884    }
885
886    #[tokio::test]
887    async fn test_multiple_segments() {
888        let mut schema_builder = SchemaBuilder::default();
889        let title = schema_builder.add_text_field("title", true, true);
890        let schema = schema_builder.build();
891
892        let dir = RamDirectory::new();
893        let config = IndexConfig {
894            max_docs_per_segment: 5, // Small segments for testing
895            ..Default::default()
896        };
897
898        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
899            .await
900            .unwrap();
901
902        // Add documents in batches to create multiple segments
903        for batch in 0..3 {
904            for i in 0..5 {
905                let mut doc = Document::new();
906                doc.add_text(title, format!("Document {} batch {}", i, batch));
907                writer.add_document(doc).await.unwrap();
908            }
909            writer.commit().await.unwrap();
910        }
911
912        // Open and check
913        let index = Index::open(dir, config).await.unwrap();
914        assert_eq!(index.num_docs(), 15);
915        assert_eq!(index.segment_readers().len(), 3);
916    }
917
918    #[tokio::test]
919    async fn test_segment_merge() {
920        let mut schema_builder = SchemaBuilder::default();
921        let title = schema_builder.add_text_field("title", true, true);
922        let schema = schema_builder.build();
923
924        let dir = RamDirectory::new();
925        let config = IndexConfig {
926            max_docs_per_segment: 3,
927            ..Default::default()
928        };
929
930        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
931            .await
932            .unwrap();
933
934        // Create multiple segments
935        for i in 0..9 {
936            let mut doc = Document::new();
937            doc.add_text(title, format!("Document {}", i));
938            writer.add_document(doc).await.unwrap();
939        }
940        writer.commit().await.unwrap();
941
942        // Should have 3 segments
943        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
944        assert_eq!(index.segment_readers().len(), 3);
945
946        // Force merge
947        let writer = IndexWriter::open(dir.clone(), config.clone())
948            .await
949            .unwrap();
950        writer.force_merge().await.unwrap();
951
952        // Should have 1 segment now
953        let index = Index::open(dir, config).await.unwrap();
954        assert_eq!(index.segment_readers().len(), 1);
955        assert_eq!(index.num_docs(), 9);
956
957        // Verify all documents accessible
958        for i in 0..9 {
959            let doc = index.doc(i).await.unwrap().unwrap();
960            assert_eq!(
961                doc.get_first(title).unwrap().as_text(),
962                Some(format!("Document {}", i).as_str())
963            );
964        }
965    }
966
967    #[tokio::test]
968    async fn test_match_query() {
969        let mut schema_builder = SchemaBuilder::default();
970        let title = schema_builder.add_text_field("title", true, true);
971        let body = schema_builder.add_text_field("body", true, true);
972        let schema = schema_builder.build();
973
974        let dir = RamDirectory::new();
975        let config = IndexConfig::default();
976
977        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
978            .await
979            .unwrap();
980
981        let mut doc1 = Document::new();
982        doc1.add_text(title, "rust programming");
983        doc1.add_text(body, "Learn rust language");
984        writer.add_document(doc1).await.unwrap();
985
986        let mut doc2 = Document::new();
987        doc2.add_text(title, "python programming");
988        doc2.add_text(body, "Learn python language");
989        writer.add_document(doc2).await.unwrap();
990
991        writer.commit().await.unwrap();
992
993        let index = Index::open(dir, config).await.unwrap();
994
995        // Test match query with multiple default fields
996        let results = index.query("rust", 10).await.unwrap();
997        assert_eq!(results.hits.len(), 1);
998
999        // Test match query with multiple tokens
1000        let results = index.query("rust programming", 10).await.unwrap();
1001        assert!(!results.hits.is_empty());
1002
1003        // Verify hit has address (segment_id + doc_id)
1004        let hit = &results.hits[0];
1005        assert!(!hit.address.segment_id.is_empty(), "Should have segment_id");
1006
1007        // Verify document retrieval by address
1008        let doc = index.get_document(&hit.address).await.unwrap().unwrap();
1009        assert!(
1010            !doc.field_values().is_empty(),
1011            "Doc should have field values"
1012        );
1013
1014        // Also verify doc retrieval directly by global doc_id
1015        let doc = index.doc(0).await.unwrap().unwrap();
1016        assert!(
1017            !doc.field_values().is_empty(),
1018            "Doc should have field values"
1019        );
1020    }
1021
1022    #[tokio::test]
1023    async fn test_slice_cache_warmup_and_load() {
1024        use crate::directories::SliceCachingDirectory;
1025
1026        let mut schema_builder = SchemaBuilder::default();
1027        let title = schema_builder.add_text_field("title", true, true);
1028        let body = schema_builder.add_text_field("body", true, true);
1029        let schema = schema_builder.build();
1030
1031        let dir = RamDirectory::new();
1032        let config = IndexConfig::default();
1033
1034        // Create index with some documents
1035        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1036            .await
1037            .unwrap();
1038
1039        for i in 0..10 {
1040            let mut doc = Document::new();
1041            doc.add_text(title, format!("Document {} about rust", i));
1042            doc.add_text(body, format!("This is body text number {}", i));
1043            writer.add_document(doc).await.unwrap();
1044        }
1045        writer.commit().await.unwrap();
1046
1047        // Open with slice caching and perform some operations to warm up cache
1048        let caching_dir = SliceCachingDirectory::new(dir.clone(), 1024 * 1024);
1049        let index = Index::open(caching_dir, config.clone()).await.unwrap();
1050
1051        // Perform a search to warm up the cache
1052        let results = index.query("rust", 10).await.unwrap();
1053        assert!(!results.hits.is_empty());
1054
1055        // Check cache stats - should have cached some data
1056        let stats = index.slice_cache_stats();
1057        assert!(stats.total_bytes > 0, "Cache should have data after search");
1058
1059        // Save the cache
1060        index.save_slice_cache().await.unwrap();
1061
1062        // Verify cache file was written
1063        assert!(
1064            dir.exists(Path::new(super::SLICE_CACHE_FILENAME))
1065                .await
1066                .unwrap()
1067        );
1068
1069        // Now open with cache loading
1070        let index2 = Index::open_with_cache(dir.clone(), config.clone(), 1024 * 1024)
1071            .await
1072            .unwrap();
1073
1074        // Cache should be prefilled
1075        let stats2 = index2.slice_cache_stats();
1076        assert!(
1077            stats2.total_bytes > 0,
1078            "Cache should be prefilled from file"
1079        );
1080
1081        // Search should still work
1082        let results2 = index2.query("rust", 10).await.unwrap();
1083        assert_eq!(results.hits.len(), results2.hits.len());
1084    }
1085
1086    #[tokio::test]
1087    async fn test_multivalue_field_indexing_and_search() {
1088        let mut schema_builder = SchemaBuilder::default();
1089        let uris = schema_builder.add_text_field("uris", true, true);
1090        let title = schema_builder.add_text_field("title", true, true);
1091        let schema = schema_builder.build();
1092
1093        let dir = RamDirectory::new();
1094        let config = IndexConfig::default();
1095
1096        // Create index and add document with multi-value field
1097        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1098            .await
1099            .unwrap();
1100
1101        let mut doc = Document::new();
1102        doc.add_text(uris, "one");
1103        doc.add_text(uris, "two");
1104        doc.add_text(title, "Test Document");
1105        writer.add_document(doc).await.unwrap();
1106
1107        // Add another document with different uris
1108        let mut doc2 = Document::new();
1109        doc2.add_text(uris, "three");
1110        doc2.add_text(title, "Another Document");
1111        writer.add_document(doc2).await.unwrap();
1112
1113        writer.commit().await.unwrap();
1114
1115        // Open for reading
1116        let index = Index::open(dir, config).await.unwrap();
1117        assert_eq!(index.num_docs(), 2);
1118
1119        // Verify document retrieval preserves all values
1120        let doc = index.doc(0).await.unwrap().unwrap();
1121        let all_uris: Vec<_> = doc.get_all(uris).collect();
1122        assert_eq!(all_uris.len(), 2, "Should have 2 uris values");
1123        assert_eq!(all_uris[0].as_text(), Some("one"));
1124        assert_eq!(all_uris[1].as_text(), Some("two"));
1125
1126        // Verify to_json returns array for multi-value field
1127        let json = doc.to_json(index.schema());
1128        let uris_json = json.get("uris").unwrap();
1129        assert!(uris_json.is_array(), "Multi-value field should be an array");
1130        let uris_arr = uris_json.as_array().unwrap();
1131        assert_eq!(uris_arr.len(), 2);
1132        assert_eq!(uris_arr[0].as_str(), Some("one"));
1133        assert_eq!(uris_arr[1].as_str(), Some("two"));
1134
1135        // Verify both values are searchable
1136        let results = index.query("uris:one", 10).await.unwrap();
1137        assert_eq!(results.hits.len(), 1, "Should find doc with 'one'");
1138        assert_eq!(results.hits[0].address.doc_id, 0);
1139
1140        let results = index.query("uris:two", 10).await.unwrap();
1141        assert_eq!(results.hits.len(), 1, "Should find doc with 'two'");
1142        assert_eq!(results.hits[0].address.doc_id, 0);
1143
1144        let results = index.query("uris:three", 10).await.unwrap();
1145        assert_eq!(results.hits.len(), 1, "Should find doc with 'three'");
1146        assert_eq!(results.hits[0].address.doc_id, 1);
1147
1148        // Verify searching for non-existent value returns no results
1149        let results = index.query("uris:nonexistent", 10).await.unwrap();
1150        assert_eq!(results.hits.len(), 0, "Should not find non-existent value");
1151    }
1152}