Skip to main content

hermes_core/index/
mod.rs

1//! Index - multi-segment async search index
2//!
3//! The `Index` is the central concept that provides:
4//! - `Index::create()` / `Index::open()` - create or open an index
5//! - `index.writer()` - get an IndexWriter for adding documents
6//! - `index.reader()` - get an IndexReader for searching (with reload policy)
7//!
8//! The Index owns the SegmentManager which handles segment lifecycle and tracking.
9
10#[cfg(feature = "native")]
11use crate::dsl::Schema;
12#[cfg(feature = "native")]
13use crate::error::Result;
14#[cfg(feature = "native")]
15use std::sync::Arc;
16
17mod searcher;
18pub use searcher::Searcher;
19
20#[cfg(feature = "native")]
21mod reader;
22#[cfg(feature = "native")]
23mod vector_builder;
24#[cfg(feature = "native")]
25mod writer;
26#[cfg(feature = "native")]
27pub use reader::IndexReader;
28#[cfg(feature = "native")]
29pub use writer::{IndexWriter, PreparedCommit};
30
31mod metadata;
32pub use metadata::{FieldVectorMeta, INDEX_META_FILENAME, IndexMetadata, VectorIndexState};
33
34#[cfg(feature = "native")]
35mod helpers;
36#[cfg(feature = "native")]
37pub use helpers::{
38    IndexingStats, SchemaConfig, SchemaFieldConfig, create_index_at_path, create_index_from_sdl,
39    index_documents_from_reader, index_json_document, parse_schema,
40};
41
42/// Default file name for the slice cache
43pub const SLICE_CACHE_FILENAME: &str = "index.slicecache";
44
45/// Index configuration
46#[derive(Debug, Clone)]
47pub struct IndexConfig {
48    /// Number of threads for CPU-intensive tasks (search parallelism)
49    pub num_threads: usize,
50    /// Number of parallel segment builders (documents distributed round-robin)
51    pub num_indexing_threads: usize,
52    /// Number of threads for parallel block compression within each segment
53    pub num_compression_threads: usize,
54    /// Block cache size for term dictionary per segment
55    pub term_cache_blocks: usize,
56    /// Block cache size for document store per segment
57    pub store_cache_blocks: usize,
58    /// Max memory (bytes) across all builders before auto-commit (global limit)
59    pub max_indexing_memory_bytes: usize,
60    /// Merge policy for background segment merging
61    pub merge_policy: Box<dyn crate::merge::MergePolicy>,
62    /// Index optimization mode (adaptive, size-optimized, performance-optimized)
63    pub optimization: crate::structures::IndexOptimization,
64    /// Reload interval in milliseconds for IndexReader (how often to check for new segments)
65    pub reload_interval_ms: u64,
66    /// Maximum number of concurrent background merges (default: 4)
67    pub max_concurrent_merges: usize,
68}
69
70impl Default for IndexConfig {
71    fn default() -> Self {
72        #[cfg(feature = "native")]
73        let indexing_threads = crate::default_indexing_threads();
74        #[cfg(not(feature = "native"))]
75        let indexing_threads = 1;
76
77        #[cfg(feature = "native")]
78        let compression_threads = crate::default_compression_threads();
79        #[cfg(not(feature = "native"))]
80        let compression_threads = 1;
81
82        Self {
83            num_threads: indexing_threads,
84            num_indexing_threads: 1, // Increase to 2+ for production to avoid stalls during segment build
85            num_compression_threads: compression_threads,
86            term_cache_blocks: 256,
87            store_cache_blocks: 32,
88            max_indexing_memory_bytes: 256 * 1024 * 1024, // 256 MB default
89            merge_policy: Box::new(crate::merge::TieredMergePolicy::default()),
90            optimization: crate::structures::IndexOptimization::default(),
91            reload_interval_ms: 1000, // 1 second default
92            max_concurrent_merges: 4,
93        }
94    }
95}
96
97/// Multi-segment async Index
98///
99/// The central concept for search. Owns segment lifecycle and provides:
100/// - `Index::create()` / `Index::open()` - create or open an index
101/// - `index.writer()` - get an IndexWriter for adding documents
102/// - `index.reader()` - get an IndexReader for searching with reload policy
103///
104/// All segment management is delegated to SegmentManager.
105#[cfg(feature = "native")]
106pub struct Index<D: crate::directories::DirectoryWriter + 'static> {
107    directory: Arc<D>,
108    schema: Arc<Schema>,
109    config: IndexConfig,
110    /// Segment manager - owns segments, tracker, metadata, and trained structures
111    segment_manager: Arc<crate::merge::SegmentManager<D>>,
112    /// Cached reader (created lazily, reused across calls)
113    cached_reader: tokio::sync::OnceCell<IndexReader<D>>,
114}
115
116#[cfg(feature = "native")]
117impl<D: crate::directories::DirectoryWriter + 'static> Index<D> {
118    /// Create a new index in the directory
119    pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
120        let directory = Arc::new(directory);
121        let schema = Arc::new(schema);
122        let metadata = IndexMetadata::new((*schema).clone());
123
124        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
125            Arc::clone(&directory),
126            Arc::clone(&schema),
127            metadata,
128            config.merge_policy.clone_box(),
129            config.term_cache_blocks,
130            config.max_concurrent_merges,
131        ));
132
133        // Save initial metadata
134        segment_manager.update_metadata(|_| {}).await?;
135
136        Ok(Self {
137            directory,
138            schema,
139            config,
140            segment_manager,
141            cached_reader: tokio::sync::OnceCell::new(),
142        })
143    }
144
145    /// Open an existing index from a directory
146    pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
147        let directory = Arc::new(directory);
148
149        // Load metadata (includes schema)
150        let metadata = IndexMetadata::load(directory.as_ref()).await?;
151        let schema = Arc::new(metadata.schema.clone());
152
153        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
154            Arc::clone(&directory),
155            Arc::clone(&schema),
156            metadata,
157            config.merge_policy.clone_box(),
158            config.term_cache_blocks,
159            config.max_concurrent_merges,
160        ));
161
162        // Load trained structures into SegmentManager's ArcSwap
163        segment_manager.load_and_publish_trained().await;
164
165        Ok(Self {
166            directory,
167            schema,
168            config,
169            segment_manager,
170            cached_reader: tokio::sync::OnceCell::new(),
171        })
172    }
173
174    /// Get the schema
175    pub fn schema(&self) -> &Schema {
176        &self.schema
177    }
178
179    /// Get the schema as an Arc reference (avoids clone when Arc is needed)
180    pub fn schema_arc(&self) -> &Arc<Schema> {
181        &self.schema
182    }
183
184    /// Get a reference to the underlying directory
185    pub fn directory(&self) -> &D {
186        &self.directory
187    }
188
189    /// Get the segment manager
190    pub fn segment_manager(&self) -> &Arc<crate::merge::SegmentManager<D>> {
191        &self.segment_manager
192    }
193
194    /// Get an IndexReader for searching (with reload policy)
195    ///
196    /// The reader is cached and reused across calls. The reader's internal
197    /// searcher will reload segments based on its reload interval (configurable via IndexConfig).
198    pub async fn reader(&self) -> Result<&IndexReader<D>> {
199        self.cached_reader
200            .get_or_try_init(|| async {
201                IndexReader::from_segment_manager(
202                    Arc::clone(&self.schema),
203                    Arc::clone(&self.segment_manager),
204                    self.config.term_cache_blocks,
205                    self.config.reload_interval_ms,
206                )
207                .await
208            })
209            .await
210    }
211
212    /// Get the config
213    pub fn config(&self) -> &IndexConfig {
214        &self.config
215    }
216
217    /// Get segment readers for query execution (convenience method)
218    pub async fn segment_readers(&self) -> Result<Vec<Arc<crate::segment::SegmentReader>>> {
219        let reader = self.reader().await?;
220        let searcher = reader.searcher().await?;
221        Ok(searcher.segment_readers().to_vec())
222    }
223
224    /// Total number of documents across all segments
225    pub async fn num_docs(&self) -> Result<u32> {
226        let reader = self.reader().await?;
227        let searcher = reader.searcher().await?;
228        Ok(searcher.num_docs())
229    }
230
231    /// Get default fields for search
232    pub fn default_fields(&self) -> Vec<crate::Field> {
233        if !self.schema.default_fields().is_empty() {
234            self.schema.default_fields().to_vec()
235        } else {
236            self.schema
237                .fields()
238                .filter(|(_, entry)| {
239                    entry.indexed && entry.field_type == crate::dsl::FieldType::Text
240                })
241                .map(|(field, _)| field)
242                .collect()
243        }
244    }
245
246    /// Get tokenizer registry
247    pub fn tokenizers(&self) -> Arc<crate::tokenizer::TokenizerRegistry> {
248        Arc::new(crate::tokenizer::TokenizerRegistry::default())
249    }
250
251    /// Create a query parser for this index
252    pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
253        let default_fields = self.default_fields();
254        let tokenizers = self.tokenizers();
255
256        let query_routers = self.schema.query_routers();
257        if !query_routers.is_empty()
258            && let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers)
259        {
260            return crate::dsl::QueryLanguageParser::with_router(
261                Arc::clone(&self.schema),
262                default_fields,
263                tokenizers,
264                router,
265            );
266        }
267
268        crate::dsl::QueryLanguageParser::new(Arc::clone(&self.schema), default_fields, tokenizers)
269    }
270
271    /// Parse and search using a query string
272    pub async fn query(
273        &self,
274        query_str: &str,
275        limit: usize,
276    ) -> Result<crate::query::SearchResponse> {
277        self.query_offset(query_str, limit, 0).await
278    }
279
280    /// Query with offset for pagination
281    pub async fn query_offset(
282        &self,
283        query_str: &str,
284        limit: usize,
285        offset: usize,
286    ) -> Result<crate::query::SearchResponse> {
287        let parser = self.query_parser();
288        let query = parser
289            .parse(query_str)
290            .map_err(crate::error::Error::Query)?;
291        self.search_offset(query.as_ref(), limit, offset).await
292    }
293
294    /// Search and return results
295    pub async fn search(
296        &self,
297        query: &dyn crate::query::Query,
298        limit: usize,
299    ) -> Result<crate::query::SearchResponse> {
300        self.search_offset(query, limit, 0).await
301    }
302
303    /// Search with offset for pagination
304    pub async fn search_offset(
305        &self,
306        query: &dyn crate::query::Query,
307        limit: usize,
308        offset: usize,
309    ) -> Result<crate::query::SearchResponse> {
310        let reader = self.reader().await?;
311        let searcher = reader.searcher().await?;
312
313        #[cfg(feature = "sync")]
314        let (results, total_seen) = {
315            // Sync search: rayon handles segment parallelism internally.
316            // On multi-threaded tokio, use block_in_place to yield the worker;
317            // on single-threaded (tests), call directly.
318            let runtime_flavor = tokio::runtime::Handle::current().runtime_flavor();
319            if runtime_flavor == tokio::runtime::RuntimeFlavor::MultiThread {
320                tokio::task::block_in_place(|| {
321                    searcher.search_with_offset_and_count_sync(query, limit, offset)
322                })?
323            } else {
324                searcher.search_with_offset_and_count_sync(query, limit, offset)?
325            }
326        };
327
328        #[cfg(not(feature = "sync"))]
329        let (results, total_seen) = {
330            searcher
331                .search_with_offset_and_count(query, limit, offset)
332                .await?
333        };
334
335        let total_hits = total_seen;
336        let hits: Vec<crate::query::SearchHit> = results
337            .into_iter()
338            .map(|result| crate::query::SearchHit {
339                address: crate::query::DocAddress::new(result.segment_id, result.doc_id),
340                score: result.score,
341                matched_fields: result.extract_ordinals(),
342            })
343            .collect();
344
345        Ok(crate::query::SearchResponse { hits, total_hits })
346    }
347
348    /// Get a document by its unique address
349    pub async fn get_document(
350        &self,
351        address: &crate::query::DocAddress,
352    ) -> Result<Option<crate::dsl::Document>> {
353        let reader = self.reader().await?;
354        let searcher = reader.searcher().await?;
355        searcher.get_document(address).await
356    }
357
358    /// Get posting lists for a term across all segments
359    pub async fn get_postings(
360        &self,
361        field: crate::Field,
362        term: &[u8],
363    ) -> Result<
364        Vec<(
365            Arc<crate::segment::SegmentReader>,
366            crate::structures::BlockPostingList,
367        )>,
368    > {
369        let segments = self.segment_readers().await?;
370        let mut results = Vec::new();
371
372        for segment in segments {
373            if let Some(postings) = segment.get_postings(field, term).await? {
374                results.push((segment, postings));
375            }
376        }
377
378        Ok(results)
379    }
380}
381
382/// Native-only methods for Index
383#[cfg(feature = "native")]
384impl<D: crate::directories::DirectoryWriter + 'static> Index<D> {
385    /// Get an IndexWriter for adding documents
386    pub fn writer(&self) -> writer::IndexWriter<D> {
387        writer::IndexWriter::from_index(self)
388    }
389}
390
391#[cfg(test)]
392mod tests;
393
394// (tests moved to index/tests/ module)