hermes_core/
index.rs

1//! Index - multi-segment async search index
2//!
3//! Components:
4//! - Index: main entry point for searching
5//! - IndexWriter: for adding documents and committing segments
6//! - Supports multiple segments with merge
7
8use std::path::Path;
9use std::sync::Arc;
10
11use parking_lot::RwLock;
12#[cfg(feature = "native")]
13use rustc_hash::FxHashMap;
14
15use crate::DocId;
16#[cfg(feature = "native")]
17use crate::directories::DirectoryWriter;
18use crate::directories::{Directory, SliceCachingDirectory};
19use crate::dsl::{Document, Field, Schema};
20use crate::error::{Error, Result};
21#[cfg(feature = "native")]
22use crate::segment::{SegmentBuilder, SegmentBuilderConfig, SegmentMerger};
23use crate::segment::{SegmentId, SegmentReader};
24use crate::structures::BlockPostingList;
25#[cfg(feature = "native")]
26use crate::tokenizer::BoxedTokenizer;
27
28#[cfg(feature = "native")]
29use rand::Rng;
30#[cfg(feature = "native")]
31use std::sync::atomic::{AtomicUsize, Ordering};
32#[cfg(feature = "native")]
33use tokio::sync::Mutex as AsyncMutex;
34#[cfg(feature = "native")]
35use tokio::sync::mpsc;
36
37/// Default file name for the slice cache
38pub const SLICE_CACHE_FILENAME: &str = "index.slicecache";
39
40/// Index configuration
41#[derive(Debug, Clone)]
42pub struct IndexConfig {
43    /// Number of threads for CPU-intensive tasks (search parallelism)
44    pub num_threads: usize,
45    /// Number of parallel segment builders (documents distributed round-robin)
46    pub num_indexing_threads: usize,
47    /// Number of threads for parallel block compression within each segment
48    pub num_compression_threads: usize,
49    /// Block cache size for term dictionary per segment
50    pub term_cache_blocks: usize,
51    /// Block cache size for document store per segment
52    pub store_cache_blocks: usize,
53    /// Max documents per segment before auto-commit
54    pub max_docs_per_segment: u32,
55    /// Index optimization mode (adaptive, size-optimized, performance-optimized)
56    pub optimization: crate::structures::IndexOptimization,
57}
58
59impl Default for IndexConfig {
60    fn default() -> Self {
61        #[cfg(feature = "native")]
62        let cpus = num_cpus::get().max(1);
63        #[cfg(not(feature = "native"))]
64        let cpus = 1;
65
66        Self {
67            num_threads: cpus,
68            num_indexing_threads: 1,
69            num_compression_threads: cpus,
70            term_cache_blocks: 256,
71            store_cache_blocks: 32,
72            max_docs_per_segment: 100_000,
73            optimization: crate::structures::IndexOptimization::default(),
74        }
75    }
76}
77
78/// Multi-segment async Index
79///
80/// The main entry point for searching. Manages multiple segments
81/// and provides unified search across all of them.
82pub struct Index<D: Directory> {
83    directory: Arc<D>,
84    schema: Arc<Schema>,
85    config: IndexConfig,
86    segments: RwLock<Vec<Arc<SegmentReader>>>,
87    default_fields: Vec<crate::Field>,
88    tokenizers: Arc<crate::tokenizer::TokenizerRegistry>,
89    #[cfg(feature = "native")]
90    thread_pool: Arc<rayon::ThreadPool>,
91}
92
93impl<D: Directory> Index<D> {
94    /// Open an existing index from a directory
95    pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
96        let directory = Arc::new(directory);
97
98        // Read schema
99        let schema_slice = directory.open_read(Path::new("schema.json")).await?;
100        let schema_bytes = schema_slice.read_bytes().await?;
101        let schema: Schema = serde_json::from_slice(schema_bytes.as_slice())
102            .map_err(|e| Error::Serialization(e.to_string()))?;
103        let schema = Arc::new(schema);
104
105        // Read segment list
106        let segments = Self::load_segments(&directory, &schema, &config).await?;
107
108        #[cfg(feature = "native")]
109        let thread_pool = {
110            let pool = rayon::ThreadPoolBuilder::new()
111                .num_threads(config.num_threads)
112                .build()
113                .map_err(|e| Error::Io(std::io::Error::other(e)))?;
114            Arc::new(pool)
115        };
116
117        // Use schema's default_fields if specified, otherwise fall back to all indexed text fields
118        let default_fields: Vec<crate::Field> = if !schema.default_fields().is_empty() {
119            schema.default_fields().to_vec()
120        } else {
121            schema
122                .fields()
123                .filter(|(_, entry)| {
124                    entry.indexed && entry.field_type == crate::dsl::FieldType::Text
125                })
126                .map(|(field, _)| field)
127                .collect()
128        };
129
130        Ok(Self {
131            directory,
132            schema,
133            config,
134            segments: RwLock::new(segments),
135            default_fields,
136            tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
137            #[cfg(feature = "native")]
138            thread_pool,
139        })
140    }
141
142    async fn load_segments(
143        directory: &Arc<D>,
144        schema: &Arc<Schema>,
145        config: &IndexConfig,
146    ) -> Result<Vec<Arc<SegmentReader>>> {
147        // Read segments.json which lists all segment IDs
148        let segments_path = Path::new("segments.json");
149        if !directory.exists(segments_path).await? {
150            return Ok(Vec::new());
151        }
152
153        let segments_slice = directory.open_read(segments_path).await?;
154        let segments_bytes = segments_slice.read_bytes().await?;
155        let segment_ids: Vec<String> = serde_json::from_slice(segments_bytes.as_slice())
156            .map_err(|e| Error::Serialization(e.to_string()))?;
157
158        let mut segments = Vec::new();
159        let mut doc_id_offset = 0u32;
160
161        for id_str in segment_ids {
162            let segment_id = SegmentId::from_hex(&id_str)
163                .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
164            let reader = SegmentReader::open(
165                directory.as_ref(),
166                segment_id,
167                Arc::clone(schema),
168                doc_id_offset,
169                config.term_cache_blocks,
170            )
171            .await?;
172
173            doc_id_offset += reader.meta().num_docs;
174            segments.push(Arc::new(reader));
175        }
176
177        Ok(segments)
178    }
179
180    /// Get the schema
181    pub fn schema(&self) -> &Schema {
182        &self.schema
183    }
184
185    /// Get a reference to the underlying directory
186    pub fn directory(&self) -> &D {
187        &self.directory
188    }
189
190    /// Total number of documents across all segments
191    pub fn num_docs(&self) -> u32 {
192        self.segments.read().iter().map(|s| s.num_docs()).sum()
193    }
194
195    /// Get a document by global doc_id (async)
196    pub async fn doc(&self, doc_id: DocId) -> Result<Option<Document>> {
197        let segments = self.segments.read().clone();
198
199        let mut offset = 0u32;
200        for segment in segments.iter() {
201            let segment_docs = segment.meta().num_docs;
202            if doc_id < offset + segment_docs {
203                let local_doc_id = doc_id - offset;
204                return segment.doc(local_doc_id).await;
205            }
206            offset += segment_docs;
207        }
208
209        Ok(None)
210    }
211
212    /// Get posting lists for a term across all segments (async)
213    pub async fn get_postings(
214        &self,
215        field: Field,
216        term: &[u8],
217    ) -> Result<Vec<(Arc<SegmentReader>, BlockPostingList)>> {
218        let segments = self.segments.read().clone();
219        let mut results = Vec::new();
220
221        for segment in segments.iter() {
222            if let Some(postings) = segment.get_postings(field, term).await? {
223                results.push((Arc::clone(segment), postings));
224            }
225        }
226
227        Ok(results)
228    }
229
230    /// Execute CPU-intensive work on thread pool (native only)
231    #[cfg(feature = "native")]
232    pub async fn spawn_blocking<F, R>(&self, f: F) -> R
233    where
234        F: FnOnce() -> R + Send + 'static,
235        R: Send + 'static,
236    {
237        let (tx, rx) = tokio::sync::oneshot::channel();
238        self.thread_pool.spawn(move || {
239            let result = f();
240            let _ = tx.send(result);
241        });
242        rx.await.expect("Thread pool task panicked")
243    }
244
245    /// Get segment readers for query execution
246    pub fn segment_readers(&self) -> Vec<Arc<SegmentReader>> {
247        self.segments.read().clone()
248    }
249
250    /// Reload segments from directory (after new segments added)
251    pub async fn reload(&self) -> Result<()> {
252        let new_segments = Self::load_segments(&self.directory, &self.schema, &self.config).await?;
253        *self.segments.write() = new_segments;
254        Ok(())
255    }
256
257    /// Search across all segments
258    pub async fn search(
259        &self,
260        query: &dyn crate::query::Query,
261        limit: usize,
262    ) -> Result<Vec<crate::query::SearchResult>> {
263        let segments = self.segments.read().clone();
264        let mut all_results = Vec::new();
265
266        for segment in &segments {
267            let results = crate::query::search_segment(segment.as_ref(), query, limit).await?;
268            all_results.extend(results);
269        }
270
271        // Sort by score descending
272        all_results.sort_by(|a, b| {
273            b.score
274                .partial_cmp(&a.score)
275                .unwrap_or(std::cmp::Ordering::Equal)
276        });
277        all_results.truncate(limit);
278
279        Ok(all_results)
280    }
281
282    /// Search and return results with document addresses (no document content)
283    pub async fn search_with_addresses(
284        &self,
285        query: &dyn crate::query::Query,
286        limit: usize,
287    ) -> Result<crate::query::SearchResponse> {
288        let segments = self.segments.read().clone();
289        let mut all_results: Vec<(u128, crate::query::SearchResult)> = Vec::new();
290
291        for segment in &segments {
292            let segment_id = segment.meta().id;
293            let results = crate::query::search_segment(segment.as_ref(), query, limit).await?;
294            for result in results {
295                all_results.push((segment_id, result));
296            }
297        }
298
299        // Sort by score descending
300        all_results.sort_by(|a, b| {
301            b.1.score
302                .partial_cmp(&a.1.score)
303                .unwrap_or(std::cmp::Ordering::Equal)
304        });
305        all_results.truncate(limit);
306
307        let total_hits = all_results.len() as u32;
308        let hits: Vec<crate::query::SearchHit> = all_results
309            .into_iter()
310            .map(|(segment_id, result)| crate::query::SearchHit {
311                address: crate::query::DocAddress::new(segment_id, result.doc_id),
312                score: result.score,
313            })
314            .collect();
315
316        Ok(crate::query::SearchResponse { hits, total_hits })
317    }
318
319    /// Get a document by its unique address (segment_id + local doc_id)
320    pub async fn get_document(
321        &self,
322        address: &crate::query::DocAddress,
323    ) -> Result<Option<Document>> {
324        let segment_id = address
325            .segment_id_u128()
326            .ok_or_else(|| Error::Query(format!("Invalid segment ID: {}", address.segment_id)))?;
327
328        let segments = self.segments.read().clone();
329        for segment in &segments {
330            if segment.meta().id == segment_id {
331                return segment.doc(address.doc_id).await;
332            }
333        }
334
335        Ok(None)
336    }
337
338    /// Get the default fields for this index
339    pub fn default_fields(&self) -> &[crate::Field] {
340        &self.default_fields
341    }
342
343    /// Set the default fields for query parsing
344    pub fn set_default_fields(&mut self, fields: Vec<crate::Field>) {
345        self.default_fields = fields;
346    }
347
348    /// Get the tokenizer registry
349    pub fn tokenizers(&self) -> &Arc<crate::tokenizer::TokenizerRegistry> {
350        &self.tokenizers
351    }
352
353    /// Create a query parser for this index
354    ///
355    /// If the schema contains query router rules, they will be used to route
356    /// queries to specific fields based on regex patterns.
357    pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
358        // Check if schema has query routers
359        let query_routers = self.schema.query_routers();
360        if !query_routers.is_empty() {
361            // Try to create a router from the schema's rules
362            if let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers) {
363                return crate::dsl::QueryLanguageParser::with_router(
364                    Arc::clone(&self.schema),
365                    self.default_fields.clone(),
366                    Arc::clone(&self.tokenizers),
367                    router,
368                );
369            }
370        }
371
372        // Fall back to parser without router
373        crate::dsl::QueryLanguageParser::new(
374            Arc::clone(&self.schema),
375            self.default_fields.clone(),
376            Arc::clone(&self.tokenizers),
377        )
378    }
379
380    /// Parse and search using a query string
381    ///
382    /// Accepts both query language syntax (field:term, AND, OR, NOT, grouping)
383    /// and simple text (tokenized and searched across default fields).
384    /// Returns document addresses (segment_id + doc_id) without document content.
385    pub async fn query(
386        &self,
387        query_str: &str,
388        limit: usize,
389    ) -> Result<crate::query::SearchResponse> {
390        let parser = self.query_parser();
391        let query = parser.parse(query_str).map_err(Error::Query)?;
392        self.search_with_addresses(query.as_ref(), limit).await
393    }
394}
395
396/// Methods for opening index with slice caching
397impl<D: Directory> Index<SliceCachingDirectory<D>> {
398    /// Open an index with slice caching, automatically loading the cache file if present
399    ///
400    /// This wraps the directory in a SliceCachingDirectory and attempts to load
401    /// any existing slice cache file to prefill the cache with hot data.
402    pub async fn open_with_cache(
403        directory: D,
404        config: IndexConfig,
405        cache_max_bytes: usize,
406    ) -> Result<Self> {
407        let caching_dir = SliceCachingDirectory::new(directory, cache_max_bytes);
408
409        // Try to load existing slice cache
410        let cache_path = Path::new(SLICE_CACHE_FILENAME);
411        if let Ok(true) = caching_dir.inner().exists(cache_path).await
412            && let Ok(slice) = caching_dir.inner().open_read(cache_path).await
413            && let Ok(bytes) = slice.read_bytes().await
414        {
415            let _ = caching_dir.deserialize(bytes.as_slice());
416        }
417
418        Self::open(caching_dir, config).await
419    }
420
421    /// Serialize the current slice cache to the index directory
422    ///
423    /// This saves all cached slices to a single file that can be loaded
424    /// on subsequent index opens for faster startup.
425    #[cfg(feature = "native")]
426    pub async fn save_slice_cache(&self) -> Result<()>
427    where
428        D: DirectoryWriter,
429    {
430        let cache_data = self.directory.serialize();
431        let cache_path = Path::new(SLICE_CACHE_FILENAME);
432        self.directory
433            .inner()
434            .write(cache_path, &cache_data)
435            .await?;
436        Ok(())
437    }
438
439    /// Get slice cache statistics
440    pub fn slice_cache_stats(&self) -> crate::directories::SliceCacheStats {
441        self.directory.stats()
442    }
443}
444
445/// Warm up the slice cache by opening an index and performing typical read operations
446///
447/// This function opens an index using a SliceCachingDirectory, performs operations
448/// that would typically be done during search (reading term dictionaries, posting lists),
449/// and then serializes the cache to a file for future use.
450///
451/// The resulting cache file contains all the "hot" data that was read during warmup,
452/// allowing subsequent index opens to prefill the cache and avoid cold-start latency.
453#[cfg(feature = "native")]
454pub async fn warmup_and_save_slice_cache<D: DirectoryWriter>(
455    directory: D,
456    config: IndexConfig,
457    cache_max_bytes: usize,
458) -> Result<()> {
459    let caching_dir = SliceCachingDirectory::new(directory, cache_max_bytes);
460    let index = Index::open(caching_dir, config).await?;
461
462    // Warm up by loading segment metadata and term dictionaries
463    // The SegmentReader::open already reads essential metadata
464    // Additional warmup can be done by iterating terms or doing sample queries
465
466    // Save the cache
467    index.save_slice_cache().await?;
468
469    Ok(())
470}
471
472#[cfg(feature = "native")]
473impl<D: Directory> Clone for Index<D> {
474    fn clone(&self) -> Self {
475        Self {
476            directory: Arc::clone(&self.directory),
477            schema: Arc::clone(&self.schema),
478            config: self.config.clone(),
479            segments: RwLock::new(self.segments.read().clone()),
480            default_fields: self.default_fields.clone(),
481            tokenizers: Arc::clone(&self.tokenizers),
482            thread_pool: Arc::clone(&self.thread_pool),
483        }
484    }
485}
486
487/// Async IndexWriter for adding documents and committing segments
488///
489/// Features:
490/// - Parallel indexing with multiple segment builders
491/// - Streams documents to disk immediately (no in-memory document storage)
492/// - Uses string interning for terms (reduced allocations)
493/// - Uses hashbrown HashMap (faster than BTreeMap)
494#[cfg(feature = "native")]
495pub struct IndexWriter<D: DirectoryWriter> {
496    directory: Arc<D>,
497    schema: Arc<Schema>,
498    config: IndexConfig,
499    builder_config: SegmentBuilderConfig,
500    tokenizers: FxHashMap<Field, BoxedTokenizer>,
501    /// Multiple segment builders for parallel indexing
502    builders: Vec<AsyncMutex<Option<SegmentBuilder>>>,
503    /// List of committed segment IDs (hex strings)
504    segment_ids: Arc<AsyncMutex<Vec<String>>>,
505    /// Channel sender for completed segment IDs from background builds
506    segment_id_sender: mpsc::UnboundedSender<String>,
507    /// Channel receiver for completed segment IDs
508    segment_id_receiver: AsyncMutex<mpsc::UnboundedReceiver<String>>,
509    /// Count of in-flight background builds
510    pending_builds: AtomicUsize,
511}
512
513#[cfg(feature = "native")]
514impl<D: DirectoryWriter> IndexWriter<D> {
515    /// Create a new index in the directory
516    pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
517        Self::create_with_config(directory, schema, config, SegmentBuilderConfig::default()).await
518    }
519
520    /// Create a new index with custom builder config
521    pub async fn create_with_config(
522        directory: D,
523        schema: Schema,
524        config: IndexConfig,
525        builder_config: SegmentBuilderConfig,
526    ) -> Result<Self> {
527        let directory = Arc::new(directory);
528        let schema = Arc::new(schema);
529
530        // Write schema
531        let schema_bytes =
532            serde_json::to_vec(&*schema).map_err(|e| Error::Serialization(e.to_string()))?;
533        directory
534            .write(Path::new("schema.json"), &schema_bytes)
535            .await?;
536
537        // Write empty segments list
538        let segments_bytes = serde_json::to_vec(&Vec::<String>::new())
539            .map_err(|e| Error::Serialization(e.to_string()))?;
540        directory
541            .write(Path::new("segments.json"), &segments_bytes)
542            .await?;
543
544        // Create multiple builders for parallel indexing
545        let num_builders = config.num_indexing_threads.max(1);
546        let mut builders = Vec::with_capacity(num_builders);
547        for _ in 0..num_builders {
548            builders.push(AsyncMutex::new(None));
549        }
550
551        // Create channel for background builds to report completed segment IDs
552        let (segment_id_sender, segment_id_receiver) = mpsc::unbounded_channel();
553
554        Ok(Self {
555            directory,
556            schema,
557            config,
558            builder_config,
559            tokenizers: FxHashMap::default(),
560            builders,
561            segment_ids: Arc::new(AsyncMutex::new(Vec::new())),
562            segment_id_sender,
563            segment_id_receiver: AsyncMutex::new(segment_id_receiver),
564            pending_builds: AtomicUsize::new(0),
565        })
566    }
567
568    /// Open an existing index for writing
569    pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
570        Self::open_with_config(directory, config, SegmentBuilderConfig::default()).await
571    }
572
573    /// Open an existing index with custom builder config
574    pub async fn open_with_config(
575        directory: D,
576        config: IndexConfig,
577        builder_config: SegmentBuilderConfig,
578    ) -> Result<Self> {
579        let directory = Arc::new(directory);
580
581        // Read schema
582        let schema_slice = directory.open_read(Path::new("schema.json")).await?;
583        let schema_bytes = schema_slice.read_bytes().await?;
584        let schema: Schema = serde_json::from_slice(schema_bytes.as_slice())
585            .map_err(|e| Error::Serialization(e.to_string()))?;
586        let schema = Arc::new(schema);
587
588        // Read existing segment IDs (hex strings)
589        let segments_slice = directory.open_read(Path::new("segments.json")).await?;
590        let segments_bytes = segments_slice.read_bytes().await?;
591        let segment_ids: Vec<String> = serde_json::from_slice(segments_bytes.as_slice())
592            .map_err(|e| Error::Serialization(e.to_string()))?;
593
594        // Create multiple builders for parallel indexing
595        let num_builders = config.num_indexing_threads.max(1);
596        let mut builders = Vec::with_capacity(num_builders);
597        for _ in 0..num_builders {
598            builders.push(AsyncMutex::new(None));
599        }
600
601        // Create channel for background builds to report completed segment IDs
602        let (segment_id_sender, segment_id_receiver) = mpsc::unbounded_channel();
603
604        Ok(Self {
605            directory,
606            schema,
607            config,
608            builder_config,
609            tokenizers: FxHashMap::default(),
610            builders,
611            segment_ids: Arc::new(AsyncMutex::new(segment_ids)),
612            segment_id_sender,
613            segment_id_receiver: AsyncMutex::new(segment_id_receiver),
614            pending_builds: AtomicUsize::new(0),
615        })
616    }
617
618    /// Get the schema
619    pub fn schema(&self) -> &Schema {
620        &self.schema
621    }
622
623    /// Set tokenizer for a field
624    pub fn set_tokenizer<T: crate::tokenizer::Tokenizer>(&mut self, field: Field, tokenizer: T) {
625        self.tokenizers.insert(field, Box::new(tokenizer));
626    }
627
628    /// Add a document
629    ///
630    /// Documents are distributed randomly across multiple builders for parallel indexing.
631    /// Random distribution avoids atomic contention and provides better load balancing.
632    /// When a builder reaches `max_docs_per_segment`, it is committed and a new one starts.
633    pub async fn add_document(&self, doc: Document) -> Result<DocId> {
634        // Random selection of builder - avoids atomic contention
635        let builder_idx = rand::rng().random_range(0..self.builders.len());
636
637        let mut builder_guard = self.builders[builder_idx].lock().await;
638
639        // Initialize builder if needed
640        if builder_guard.is_none() {
641            let mut builder =
642                SegmentBuilder::new((*self.schema).clone(), self.builder_config.clone())?;
643            for (field, tokenizer) in &self.tokenizers {
644                builder.set_tokenizer(*field, tokenizer.clone_box());
645            }
646            *builder_guard = Some(builder);
647        }
648
649        let builder = builder_guard.as_mut().unwrap();
650        let doc_id = builder.add_document(doc)?;
651
652        // Check if we need to commit
653        if builder.num_docs() >= self.config.max_docs_per_segment {
654            let full_builder = builder_guard.take().unwrap();
655            drop(builder_guard); // Release lock before spawning background task
656            self.spawn_background_build(full_builder);
657        }
658
659        Ok(doc_id)
660    }
661
662    /// Spawn a background task to build a segment without blocking document ingestion
663    ///
664    /// The background task will send its segment ID through the channel when complete,
665    /// allowing indexing to continue immediately.
666    fn spawn_background_build(&self, builder: SegmentBuilder) {
667        let directory = Arc::clone(&self.directory);
668        let segment_id = SegmentId::new();
669        let segment_hex = segment_id.to_hex();
670        let sender = self.segment_id_sender.clone();
671        let segment_ids = Arc::clone(&self.segment_ids);
672
673        self.pending_builds.fetch_add(1, Ordering::SeqCst);
674
675        // Spawn a fully independent task that registers its own segment ID
676        tokio::spawn(async move {
677            match builder.build(directory.as_ref(), segment_id).await {
678                Ok(_) => {
679                    // Register segment ID immediately upon completion
680                    let mut ids = segment_ids.lock().await;
681                    ids.push(segment_hex.clone());
682                    // Also send through channel for flush() to know when all are done
683                    let _ = sender.send(segment_hex);
684                }
685                Err(e) => {
686                    // Log error but don't crash - segment just won't be registered
687                    eprintln!("Background segment build failed: {:?}", e);
688                }
689            }
690        });
691    }
692
693    /// Collect any completed segment IDs from the channel (non-blocking)
694    async fn collect_completed_segments(&self) {
695        let mut receiver = self.segment_id_receiver.lock().await;
696        while let Ok(_segment_hex) = receiver.try_recv() {
697            // Segment ID already registered by the background task
698            self.pending_builds.fetch_sub(1, Ordering::SeqCst);
699        }
700    }
701
702    /// Get the number of pending background builds
703    pub fn pending_build_count(&self) -> usize {
704        self.pending_builds.load(Ordering::SeqCst)
705    }
706
707    /// Get current builder statistics for debugging (aggregated from all builders)
708    pub async fn get_builder_stats(&self) -> Option<crate::segment::SegmentBuilderStats> {
709        let mut total_stats: Option<crate::segment::SegmentBuilderStats> = None;
710
711        for builder_mutex in &self.builders {
712            let guard = builder_mutex.lock().await;
713            if let Some(builder) = guard.as_ref() {
714                let stats = builder.stats();
715                if let Some(ref mut total) = total_stats {
716                    total.num_docs += stats.num_docs;
717                    total.unique_terms += stats.unique_terms;
718                    total.postings_in_memory += stats.postings_in_memory;
719                    total.interned_strings += stats.interned_strings;
720                    total.doc_field_lengths_size += stats.doc_field_lengths_size;
721                } else {
722                    total_stats = Some(stats);
723                }
724            }
725        }
726
727        total_stats
728    }
729
730    /// Flush current builders to background processing (non-blocking)
731    ///
732    /// This takes all current builders with documents and spawns background tasks
733    /// to build them. Returns immediately - use `commit()` for durability.
734    /// New documents can continue to be added while segments are being built.
735    pub async fn flush(&self) -> Result<()> {
736        // Collect any already-completed segments
737        self.collect_completed_segments().await;
738
739        // Take all builders that have documents and spawn background builds
740        for builder_mutex in &self.builders {
741            let mut guard = builder_mutex.lock().await;
742            if let Some(builder) = guard.take()
743                && builder.num_docs() > 0
744            {
745                self.spawn_background_build(builder);
746            }
747        }
748
749        Ok(())
750    }
751
752    /// Commit all pending segments to disk and wait for completion
753    ///
754    /// This flushes any current builders and waits for ALL background builds
755    /// to complete. Provides durability guarantees - all data is persisted.
756    pub async fn commit(&self) -> Result<()> {
757        // First flush any current builders
758        self.flush().await?;
759
760        // Wait for all pending builds to complete
761        let mut receiver = self.segment_id_receiver.lock().await;
762        while self.pending_builds.load(Ordering::SeqCst) > 0 {
763            match receiver.recv().await {
764                Some(_segment_hex) => {
765                    self.pending_builds.fetch_sub(1, Ordering::SeqCst);
766                }
767                None => break, // Channel closed
768            }
769        }
770        drop(receiver);
771
772        // Write segments.json to persist the segment list
773        let segment_ids = self.segment_ids.lock().await;
774        let segments_bytes =
775            serde_json::to_vec(&*segment_ids).map_err(|e| Error::Serialization(e.to_string()))?;
776        self.directory
777            .write(Path::new("segments.json"), &segments_bytes)
778            .await?;
779
780        Ok(())
781    }
782
783    /// Merge all segments into one (called explicitly via force_merge)
784    async fn do_merge(&self) -> Result<()> {
785        let segment_ids = self.segment_ids.lock().await;
786
787        if segment_ids.len() < 2 {
788            return Ok(());
789        }
790
791        let ids_to_merge: Vec<String> = segment_ids.clone();
792        drop(segment_ids);
793
794        // Load segment readers
795        let mut readers = Vec::new();
796        let mut doc_offset = 0u32;
797
798        for id_str in &ids_to_merge {
799            let segment_id = SegmentId::from_hex(id_str)
800                .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
801            let reader = SegmentReader::open(
802                self.directory.as_ref(),
803                segment_id,
804                Arc::clone(&self.schema),
805                doc_offset,
806                self.config.term_cache_blocks,
807            )
808            .await?;
809            doc_offset += reader.meta().num_docs;
810            readers.push(reader);
811        }
812
813        // Merge into new segment
814        let merger = SegmentMerger::new(Arc::clone(&self.schema));
815        let new_segment_id = SegmentId::new();
816        merger
817            .merge(self.directory.as_ref(), &readers, new_segment_id)
818            .await?;
819
820        // Update segment list
821        let mut segment_ids = self.segment_ids.lock().await;
822        segment_ids.clear();
823        segment_ids.push(new_segment_id.to_hex());
824
825        let segments_bytes =
826            serde_json::to_vec(&*segment_ids).map_err(|e| Error::Serialization(e.to_string()))?;
827        self.directory
828            .write(Path::new("segments.json"), &segments_bytes)
829            .await?;
830
831        // Delete old segments
832        for id_str in ids_to_merge {
833            if let Some(segment_id) = SegmentId::from_hex(&id_str) {
834                let _ = crate::segment::delete_segment(self.directory.as_ref(), segment_id).await;
835            }
836        }
837
838        Ok(())
839    }
840
841    /// Force merge all segments into one
842    pub async fn force_merge(&self) -> Result<()> {
843        // First commit all pending documents (waits for completion)
844        self.commit().await?;
845        // Then merge all segments
846        self.do_merge().await
847    }
848}
849
850#[cfg(test)]
851mod tests {
852    use super::*;
853    use crate::directories::RamDirectory;
854    use crate::dsl::SchemaBuilder;
855
856    #[tokio::test]
857    async fn test_index_create_and_search() {
858        let mut schema_builder = SchemaBuilder::default();
859        let title = schema_builder.add_text_field("title", true, true);
860        let body = schema_builder.add_text_field("body", true, true);
861        let schema = schema_builder.build();
862
863        let dir = RamDirectory::new();
864        let config = IndexConfig::default();
865
866        // Create index and add documents
867        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
868            .await
869            .unwrap();
870
871        let mut doc1 = Document::new();
872        doc1.add_text(title, "Hello World");
873        doc1.add_text(body, "This is the first document");
874        writer.add_document(doc1).await.unwrap();
875
876        let mut doc2 = Document::new();
877        doc2.add_text(title, "Goodbye World");
878        doc2.add_text(body, "This is the second document");
879        writer.add_document(doc2).await.unwrap();
880
881        writer.commit().await.unwrap();
882
883        // Open for reading
884        let index = Index::open(dir, config).await.unwrap();
885        assert_eq!(index.num_docs(), 2);
886
887        // Check postings
888        let postings = index.get_postings(title, b"world").await.unwrap();
889        assert_eq!(postings.len(), 1); // One segment
890        assert_eq!(postings[0].1.doc_count(), 2); // Two docs with "world"
891
892        // Retrieve document
893        let doc = index.doc(0).await.unwrap().unwrap();
894        assert_eq!(doc.get_first(title).unwrap().as_text(), Some("Hello World"));
895    }
896
897    #[tokio::test]
898    async fn test_multiple_segments() {
899        let mut schema_builder = SchemaBuilder::default();
900        let title = schema_builder.add_text_field("title", true, true);
901        let schema = schema_builder.build();
902
903        let dir = RamDirectory::new();
904        let config = IndexConfig {
905            max_docs_per_segment: 5, // Small segments for testing
906            ..Default::default()
907        };
908
909        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
910            .await
911            .unwrap();
912
913        // Add documents in batches to create multiple segments
914        for batch in 0..3 {
915            for i in 0..5 {
916                let mut doc = Document::new();
917                doc.add_text(title, format!("Document {} batch {}", i, batch));
918                writer.add_document(doc).await.unwrap();
919            }
920            writer.commit().await.unwrap();
921        }
922
923        // Open and check
924        let index = Index::open(dir, config).await.unwrap();
925        assert_eq!(index.num_docs(), 15);
926        assert_eq!(index.segment_readers().len(), 3);
927    }
928
929    #[tokio::test]
930    async fn test_segment_merge() {
931        let mut schema_builder = SchemaBuilder::default();
932        let title = schema_builder.add_text_field("title", true, true);
933        let schema = schema_builder.build();
934
935        let dir = RamDirectory::new();
936        let config = IndexConfig {
937            max_docs_per_segment: 3,
938            ..Default::default()
939        };
940
941        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
942            .await
943            .unwrap();
944
945        // Create multiple segments
946        for i in 0..9 {
947            let mut doc = Document::new();
948            doc.add_text(title, format!("Document {}", i));
949            writer.add_document(doc).await.unwrap();
950        }
951        writer.commit().await.unwrap();
952
953        // Should have 3 segments
954        let index = Index::open(dir.clone(), config.clone()).await.unwrap();
955        assert_eq!(index.segment_readers().len(), 3);
956
957        // Force merge
958        let writer = IndexWriter::open(dir.clone(), config.clone())
959            .await
960            .unwrap();
961        writer.force_merge().await.unwrap();
962
963        // Should have 1 segment now
964        let index = Index::open(dir, config).await.unwrap();
965        assert_eq!(index.segment_readers().len(), 1);
966        assert_eq!(index.num_docs(), 9);
967
968        // Verify all documents accessible
969        for i in 0..9 {
970            let doc = index.doc(i).await.unwrap().unwrap();
971            assert_eq!(
972                doc.get_first(title).unwrap().as_text(),
973                Some(format!("Document {}", i).as_str())
974            );
975        }
976    }
977
978    #[tokio::test]
979    async fn test_match_query() {
980        let mut schema_builder = SchemaBuilder::default();
981        let title = schema_builder.add_text_field("title", true, true);
982        let body = schema_builder.add_text_field("body", true, true);
983        let schema = schema_builder.build();
984
985        let dir = RamDirectory::new();
986        let config = IndexConfig::default();
987
988        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
989            .await
990            .unwrap();
991
992        let mut doc1 = Document::new();
993        doc1.add_text(title, "rust programming");
994        doc1.add_text(body, "Learn rust language");
995        writer.add_document(doc1).await.unwrap();
996
997        let mut doc2 = Document::new();
998        doc2.add_text(title, "python programming");
999        doc2.add_text(body, "Learn python language");
1000        writer.add_document(doc2).await.unwrap();
1001
1002        writer.commit().await.unwrap();
1003
1004        let index = Index::open(dir, config).await.unwrap();
1005
1006        // Test match query with multiple default fields
1007        let results = index.query("rust", 10).await.unwrap();
1008        assert_eq!(results.hits.len(), 1);
1009
1010        // Test match query with multiple tokens
1011        let results = index.query("rust programming", 10).await.unwrap();
1012        assert!(!results.hits.is_empty());
1013
1014        // Verify hit has address (segment_id + doc_id)
1015        let hit = &results.hits[0];
1016        assert!(!hit.address.segment_id.is_empty(), "Should have segment_id");
1017
1018        // Verify document retrieval by address
1019        let doc = index.get_document(&hit.address).await.unwrap().unwrap();
1020        assert!(
1021            !doc.field_values().is_empty(),
1022            "Doc should have field values"
1023        );
1024
1025        // Also verify doc retrieval directly by global doc_id
1026        let doc = index.doc(0).await.unwrap().unwrap();
1027        assert!(
1028            !doc.field_values().is_empty(),
1029            "Doc should have field values"
1030        );
1031    }
1032
1033    #[tokio::test]
1034    async fn test_slice_cache_warmup_and_load() {
1035        use crate::directories::SliceCachingDirectory;
1036
1037        let mut schema_builder = SchemaBuilder::default();
1038        let title = schema_builder.add_text_field("title", true, true);
1039        let body = schema_builder.add_text_field("body", true, true);
1040        let schema = schema_builder.build();
1041
1042        let dir = RamDirectory::new();
1043        let config = IndexConfig::default();
1044
1045        // Create index with some documents
1046        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1047            .await
1048            .unwrap();
1049
1050        for i in 0..10 {
1051            let mut doc = Document::new();
1052            doc.add_text(title, format!("Document {} about rust", i));
1053            doc.add_text(body, format!("This is body text number {}", i));
1054            writer.add_document(doc).await.unwrap();
1055        }
1056        writer.commit().await.unwrap();
1057
1058        // Open with slice caching and perform some operations to warm up cache
1059        let caching_dir = SliceCachingDirectory::new(dir.clone(), 1024 * 1024);
1060        let index = Index::open(caching_dir, config.clone()).await.unwrap();
1061
1062        // Perform a search to warm up the cache
1063        let results = index.query("rust", 10).await.unwrap();
1064        assert!(!results.hits.is_empty());
1065
1066        // Check cache stats - should have cached some data
1067        let stats = index.slice_cache_stats();
1068        assert!(stats.total_bytes > 0, "Cache should have data after search");
1069
1070        // Save the cache
1071        index.save_slice_cache().await.unwrap();
1072
1073        // Verify cache file was written
1074        assert!(
1075            dir.exists(Path::new(super::SLICE_CACHE_FILENAME))
1076                .await
1077                .unwrap()
1078        );
1079
1080        // Now open with cache loading
1081        let index2 = Index::open_with_cache(dir.clone(), config.clone(), 1024 * 1024)
1082            .await
1083            .unwrap();
1084
1085        // Cache should be prefilled
1086        let stats2 = index2.slice_cache_stats();
1087        assert!(
1088            stats2.total_bytes > 0,
1089            "Cache should be prefilled from file"
1090        );
1091
1092        // Search should still work
1093        let results2 = index2.query("rust", 10).await.unwrap();
1094        assert_eq!(results.hits.len(), results2.hits.len());
1095    }
1096
1097    #[tokio::test]
1098    async fn test_multivalue_field_indexing_and_search() {
1099        let mut schema_builder = SchemaBuilder::default();
1100        let uris = schema_builder.add_text_field("uris", true, true);
1101        let title = schema_builder.add_text_field("title", true, true);
1102        let schema = schema_builder.build();
1103
1104        let dir = RamDirectory::new();
1105        let config = IndexConfig::default();
1106
1107        // Create index and add document with multi-value field
1108        let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1109            .await
1110            .unwrap();
1111
1112        let mut doc = Document::new();
1113        doc.add_text(uris, "one");
1114        doc.add_text(uris, "two");
1115        doc.add_text(title, "Test Document");
1116        writer.add_document(doc).await.unwrap();
1117
1118        // Add another document with different uris
1119        let mut doc2 = Document::new();
1120        doc2.add_text(uris, "three");
1121        doc2.add_text(title, "Another Document");
1122        writer.add_document(doc2).await.unwrap();
1123
1124        writer.commit().await.unwrap();
1125
1126        // Open for reading
1127        let index = Index::open(dir, config).await.unwrap();
1128        assert_eq!(index.num_docs(), 2);
1129
1130        // Verify document retrieval preserves all values
1131        let doc = index.doc(0).await.unwrap().unwrap();
1132        let all_uris: Vec<_> = doc.get_all(uris).collect();
1133        assert_eq!(all_uris.len(), 2, "Should have 2 uris values");
1134        assert_eq!(all_uris[0].as_text(), Some("one"));
1135        assert_eq!(all_uris[1].as_text(), Some("two"));
1136
1137        // Verify to_json returns array for multi-value field
1138        let json = doc.to_json(index.schema());
1139        let uris_json = json.get("uris").unwrap();
1140        assert!(uris_json.is_array(), "Multi-value field should be an array");
1141        let uris_arr = uris_json.as_array().unwrap();
1142        assert_eq!(uris_arr.len(), 2);
1143        assert_eq!(uris_arr[0].as_str(), Some("one"));
1144        assert_eq!(uris_arr[1].as_str(), Some("two"));
1145
1146        // Verify both values are searchable
1147        let results = index.query("uris:one", 10).await.unwrap();
1148        assert_eq!(results.hits.len(), 1, "Should find doc with 'one'");
1149        assert_eq!(results.hits[0].address.doc_id, 0);
1150
1151        let results = index.query("uris:two", 10).await.unwrap();
1152        assert_eq!(results.hits.len(), 1, "Should find doc with 'two'");
1153        assert_eq!(results.hits[0].address.doc_id, 0);
1154
1155        let results = index.query("uris:three", 10).await.unwrap();
1156        assert_eq!(results.hits.len(), 1, "Should find doc with 'three'");
1157        assert_eq!(results.hits[0].address.doc_id, 1);
1158
1159        // Verify searching for non-existent value returns no results
1160        let results = index.query("uris:nonexistent", 10).await.unwrap();
1161        assert_eq!(results.hits.len(), 0, "Should not find non-existent value");
1162    }
1163}