Skip to main content

hermes_core/index/
mod.rs

1//! Index - multi-segment async search index
2//!
3//! The `Index` is the central concept that provides:
4//! - `Index::create()` / `Index::open()` - create or open an index
5//! - `index.writer()` - get an IndexWriter for adding documents
6//! - `index.reader()` - get an IndexReader for searching (with reload policy)
7//!
8//! The Index owns the SegmentManager which handles segment lifecycle and tracking.
9
10#[cfg(feature = "native")]
11use crate::dsl::Schema;
12#[cfg(feature = "native")]
13use crate::error::Result;
14#[cfg(feature = "native")]
15use std::sync::Arc;
16
17mod searcher;
18pub use searcher::Searcher;
19
20#[cfg(feature = "native")]
21mod primary_key;
22#[cfg(feature = "native")]
23mod reader;
24#[cfg(feature = "native")]
25mod vector_builder;
26#[cfg(feature = "native")]
27mod writer;
28#[cfg(feature = "native")]
29pub use primary_key::PrimaryKeyIndex;
30#[cfg(feature = "native")]
31pub use reader::IndexReader;
32#[cfg(feature = "native")]
33pub use writer::{IndexWriter, PreparedCommit};
34
35mod metadata;
36pub use metadata::{FieldVectorMeta, INDEX_META_FILENAME, IndexMetadata, VectorIndexState};
37
38#[cfg(feature = "native")]
39mod helpers;
40#[cfg(feature = "native")]
41pub use helpers::{
42    IndexingStats, SchemaConfig, SchemaFieldConfig, create_index_at_path, create_index_from_sdl,
43    index_documents_from_reader, index_json_document, parse_schema,
44};
45
46/// Default file name for the slice cache
47pub const SLICE_CACHE_FILENAME: &str = "index.slicecache";
48
49/// Index configuration
50#[derive(Debug, Clone)]
51pub struct IndexConfig {
52    /// Number of threads for CPU-intensive tasks (search parallelism)
53    pub num_threads: usize,
54    /// Number of parallel segment builders (documents distributed round-robin)
55    pub num_indexing_threads: usize,
56    /// Number of threads for parallel block compression within each segment
57    pub num_compression_threads: usize,
58    /// Block cache size for term dictionary per segment
59    pub term_cache_blocks: usize,
60    /// Block cache size for document store per segment
61    pub store_cache_blocks: usize,
62    /// Max memory (bytes) across all builders before auto-commit (global limit)
63    pub max_indexing_memory_bytes: usize,
64    /// Merge policy for background segment merging
65    pub merge_policy: Box<dyn crate::merge::MergePolicy>,
66    /// Index optimization mode (adaptive, size-optimized, performance-optimized)
67    pub optimization: crate::structures::IndexOptimization,
68    /// Reload interval in milliseconds for IndexReader (how often to check for new segments)
69    pub reload_interval_ms: u64,
70    /// Maximum number of concurrent background merges (default: 4)
71    pub max_concurrent_merges: usize,
72}
73
74impl Default for IndexConfig {
75    fn default() -> Self {
76        #[cfg(feature = "native")]
77        let indexing_threads = crate::default_indexing_threads();
78        #[cfg(not(feature = "native"))]
79        let indexing_threads = 1;
80
81        #[cfg(feature = "native")]
82        let compression_threads = crate::default_compression_threads();
83        #[cfg(not(feature = "native"))]
84        let compression_threads = 1;
85
86        Self {
87            num_threads: indexing_threads,
88            num_indexing_threads: 1, // Increase to 2+ for production to avoid stalls during segment build
89            num_compression_threads: compression_threads,
90            term_cache_blocks: 256,
91            store_cache_blocks: 32,
92            max_indexing_memory_bytes: 256 * 1024 * 1024, // 256 MB default
93            merge_policy: Box::new(crate::merge::TieredMergePolicy::default()),
94            optimization: crate::structures::IndexOptimization::default(),
95            reload_interval_ms: 1000, // 1 second default
96            max_concurrent_merges: 4,
97        }
98    }
99}
100
101/// Multi-segment async Index
102///
103/// The central concept for search. Owns segment lifecycle and provides:
104/// - `Index::create()` / `Index::open()` - create or open an index
105/// - `index.writer()` - get an IndexWriter for adding documents
106/// - `index.reader()` - get an IndexReader for searching with reload policy
107///
108/// All segment management is delegated to SegmentManager.
109#[cfg(feature = "native")]
110pub struct Index<D: crate::directories::DirectoryWriter + 'static> {
111    directory: Arc<D>,
112    schema: Arc<Schema>,
113    config: IndexConfig,
114    /// Segment manager - owns segments, tracker, metadata, and trained structures
115    segment_manager: Arc<crate::merge::SegmentManager<D>>,
116    /// Cached reader (created lazily, reused across calls)
117    cached_reader: tokio::sync::OnceCell<IndexReader<D>>,
118}
119
120#[cfg(feature = "native")]
121impl<D: crate::directories::DirectoryWriter + 'static> Index<D> {
122    /// Create a new index in the directory
123    pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
124        let directory = Arc::new(directory);
125        let schema = Arc::new(schema);
126        let metadata = IndexMetadata::new((*schema).clone());
127
128        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
129            Arc::clone(&directory),
130            Arc::clone(&schema),
131            metadata,
132            config.merge_policy.clone_box(),
133            config.term_cache_blocks,
134            config.max_concurrent_merges,
135        ));
136
137        // Save initial metadata
138        segment_manager.update_metadata(|_| {}).await?;
139
140        Ok(Self {
141            directory,
142            schema,
143            config,
144            segment_manager,
145            cached_reader: tokio::sync::OnceCell::new(),
146        })
147    }
148
149    /// Open an existing index from a directory
150    pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
151        let directory = Arc::new(directory);
152
153        // Load metadata (includes schema)
154        let metadata = IndexMetadata::load(directory.as_ref()).await?;
155        let schema = Arc::new(metadata.schema.clone());
156
157        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
158            Arc::clone(&directory),
159            Arc::clone(&schema),
160            metadata,
161            config.merge_policy.clone_box(),
162            config.term_cache_blocks,
163            config.max_concurrent_merges,
164        ));
165
166        // Load trained structures into SegmentManager's ArcSwap
167        segment_manager.load_and_publish_trained().await;
168
169        Ok(Self {
170            directory,
171            schema,
172            config,
173            segment_manager,
174            cached_reader: tokio::sync::OnceCell::new(),
175        })
176    }
177
178    /// Get the schema
179    pub fn schema(&self) -> &Schema {
180        &self.schema
181    }
182
183    /// Get the schema as an Arc reference (avoids clone when Arc is needed)
184    pub fn schema_arc(&self) -> &Arc<Schema> {
185        &self.schema
186    }
187
188    /// Get a reference to the underlying directory
189    pub fn directory(&self) -> &D {
190        &self.directory
191    }
192
193    /// Get the segment manager
194    pub fn segment_manager(&self) -> &Arc<crate::merge::SegmentManager<D>> {
195        &self.segment_manager
196    }
197
198    /// Get an IndexReader for searching (with reload policy)
199    ///
200    /// The reader is cached and reused across calls. The reader's internal
201    /// searcher will reload segments based on its reload interval (configurable via IndexConfig).
202    pub async fn reader(&self) -> Result<&IndexReader<D>> {
203        self.cached_reader
204            .get_or_try_init(|| async {
205                IndexReader::from_segment_manager(
206                    Arc::clone(&self.schema),
207                    Arc::clone(&self.segment_manager),
208                    self.config.term_cache_blocks,
209                    self.config.reload_interval_ms,
210                )
211                .await
212            })
213            .await
214    }
215
216    /// Get the config
217    pub fn config(&self) -> &IndexConfig {
218        &self.config
219    }
220
221    /// Get segment readers for query execution (convenience method)
222    pub async fn segment_readers(&self) -> Result<Vec<Arc<crate::segment::SegmentReader>>> {
223        let reader = self.reader().await?;
224        let searcher = reader.searcher().await?;
225        Ok(searcher.segment_readers().to_vec())
226    }
227
228    /// Total number of documents across all segments
229    pub async fn num_docs(&self) -> Result<u32> {
230        let reader = self.reader().await?;
231        let searcher = reader.searcher().await?;
232        Ok(searcher.num_docs())
233    }
234
235    /// Get default fields for search
236    pub fn default_fields(&self) -> Vec<crate::Field> {
237        if !self.schema.default_fields().is_empty() {
238            self.schema.default_fields().to_vec()
239        } else {
240            self.schema
241                .fields()
242                .filter(|(_, entry)| {
243                    entry.indexed && entry.field_type == crate::dsl::FieldType::Text
244                })
245                .map(|(field, _)| field)
246                .collect()
247        }
248    }
249
250    /// Get tokenizer registry
251    pub fn tokenizers(&self) -> Arc<crate::tokenizer::TokenizerRegistry> {
252        Arc::new(crate::tokenizer::TokenizerRegistry::default())
253    }
254
255    /// Create a query parser for this index
256    pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
257        let default_fields = self.default_fields();
258        let tokenizers = self.tokenizers();
259
260        let query_routers = self.schema.query_routers();
261        if !query_routers.is_empty()
262            && let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers)
263        {
264            return crate::dsl::QueryLanguageParser::with_router(
265                Arc::clone(&self.schema),
266                default_fields,
267                tokenizers,
268                router,
269            );
270        }
271
272        crate::dsl::QueryLanguageParser::new(Arc::clone(&self.schema), default_fields, tokenizers)
273    }
274
275    /// Parse and search using a query string
276    pub async fn query(
277        &self,
278        query_str: &str,
279        limit: usize,
280    ) -> Result<crate::query::SearchResponse> {
281        self.query_offset(query_str, limit, 0).await
282    }
283
284    /// Query with offset for pagination
285    pub async fn query_offset(
286        &self,
287        query_str: &str,
288        limit: usize,
289        offset: usize,
290    ) -> Result<crate::query::SearchResponse> {
291        let parser = self.query_parser();
292        let query = parser
293            .parse(query_str)
294            .map_err(crate::error::Error::Query)?;
295        self.search_offset(query.as_ref(), limit, offset).await
296    }
297
298    /// Search and return results
299    pub async fn search(
300        &self,
301        query: &dyn crate::query::Query,
302        limit: usize,
303    ) -> Result<crate::query::SearchResponse> {
304        self.search_offset(query, limit, 0).await
305    }
306
307    /// Search with offset for pagination
308    pub async fn search_offset(
309        &self,
310        query: &dyn crate::query::Query,
311        limit: usize,
312        offset: usize,
313    ) -> Result<crate::query::SearchResponse> {
314        let reader = self.reader().await?;
315        let searcher = reader.searcher().await?;
316
317        #[cfg(feature = "sync")]
318        let (results, total_seen) = {
319            // Sync search: rayon handles segment parallelism internally.
320            // On multi-threaded tokio, use block_in_place to yield the worker;
321            // on single-threaded (tests), call directly.
322            let runtime_flavor = tokio::runtime::Handle::current().runtime_flavor();
323            if runtime_flavor == tokio::runtime::RuntimeFlavor::MultiThread {
324                tokio::task::block_in_place(|| {
325                    searcher.search_with_offset_and_count_sync(query, limit, offset)
326                })?
327            } else {
328                searcher.search_with_offset_and_count_sync(query, limit, offset)?
329            }
330        };
331
332        #[cfg(not(feature = "sync"))]
333        let (results, total_seen) = {
334            searcher
335                .search_with_offset_and_count(query, limit, offset)
336                .await?
337        };
338
339        let total_hits = total_seen;
340        let hits: Vec<crate::query::SearchHit> = results
341            .into_iter()
342            .map(|result| crate::query::SearchHit {
343                address: crate::query::DocAddress::new(result.segment_id, result.doc_id),
344                score: result.score,
345                matched_fields: result.extract_ordinals(),
346            })
347            .collect();
348
349        Ok(crate::query::SearchResponse { hits, total_hits })
350    }
351
352    /// Get a document by its unique address
353    pub async fn get_document(
354        &self,
355        address: &crate::query::DocAddress,
356    ) -> Result<Option<crate::dsl::Document>> {
357        let reader = self.reader().await?;
358        let searcher = reader.searcher().await?;
359        searcher.get_document(address).await
360    }
361
362    /// Get posting lists for a term across all segments
363    pub async fn get_postings(
364        &self,
365        field: crate::Field,
366        term: &[u8],
367    ) -> Result<
368        Vec<(
369            Arc<crate::segment::SegmentReader>,
370            crate::structures::BlockPostingList,
371        )>,
372    > {
373        let segments = self.segment_readers().await?;
374        let mut results = Vec::new();
375
376        for segment in segments {
377            if let Some(postings) = segment.get_postings(field, term).await? {
378                results.push((segment, postings));
379            }
380        }
381
382        Ok(results)
383    }
384}
385
386/// Native-only methods for Index
387#[cfg(feature = "native")]
388impl<D: crate::directories::DirectoryWriter + 'static> Index<D> {
389    /// Get an IndexWriter for adding documents
390    pub fn writer(&self) -> writer::IndexWriter<D> {
391        writer::IndexWriter::from_index(self)
392    }
393}
394
395#[cfg(test)]
396mod tests;
397
398// (tests moved to index/tests/ module)