Skip to main content

hermes_core/index/
mod.rs

1//! Index - multi-segment async search index
2//!
3//! The `Index` is the central concept that provides:
4//! - `Index::create()` / `Index::open()` - create or open an index
5//! - `index.writer()` - get an IndexWriter for adding documents
6//! - `index.reader()` - get an IndexReader for searching (with reload policy)
7//!
8//! The Index owns the SegmentManager which handles segment lifecycle and tracking.
9
10#[cfg(feature = "native")]
11use crate::dsl::Schema;
12#[cfg(feature = "native")]
13use crate::error::Result;
14#[cfg(feature = "native")]
15use std::sync::Arc;
16
17mod searcher;
18pub use searcher::Searcher;
19
20#[cfg(feature = "native")]
21mod primary_key;
22#[cfg(feature = "native")]
23mod reader;
24#[cfg(feature = "native")]
25mod vector_builder;
26#[cfg(all(feature = "wasm", not(feature = "native")))]
27mod wasm_writer;
28#[cfg(feature = "native")]
29mod writer;
30#[cfg(feature = "native")]
31pub use primary_key::PrimaryKeyIndex;
32#[cfg(feature = "native")]
33pub use reader::IndexReader;
34#[cfg(all(feature = "wasm", not(feature = "native")))]
35pub use wasm_writer::IndexWriter as WasmIndexWriter;
36#[cfg(feature = "native")]
37pub use writer::{IndexWriter, PreparedCommit};
38
39mod metadata;
40pub use metadata::{FieldVectorMeta, INDEX_META_FILENAME, IndexMetadata, VectorIndexState};
41
42#[cfg(feature = "native")]
43mod helpers;
44#[cfg(feature = "native")]
45pub use helpers::{
46    IndexingStats, SchemaConfig, SchemaFieldConfig, create_index_at_path, create_index_from_sdl,
47    index_documents_from_reader, index_json_document, parse_schema,
48};
49
50/// Default file name for the slice cache
51pub const SLICE_CACHE_FILENAME: &str = "index.slicecache";
52
53/// Index configuration
54#[derive(Debug, Clone)]
55pub struct IndexConfig {
56    /// Number of threads for CPU-intensive tasks (search parallelism)
57    pub num_threads: usize,
58    /// Number of parallel segment builders (documents distributed round-robin)
59    pub num_indexing_threads: usize,
60    /// Number of threads for parallel block compression within each segment
61    pub num_compression_threads: usize,
62    /// Block cache size for term dictionary per segment
63    pub term_cache_blocks: usize,
64    /// Block cache size for document store per segment
65    pub store_cache_blocks: usize,
66    /// Max memory (bytes) across all builders before auto-commit (global limit)
67    pub max_indexing_memory_bytes: usize,
68    /// Merge policy for background segment merging
69    pub merge_policy: Box<dyn crate::merge::MergePolicy>,
70    /// Index optimization mode (adaptive, size-optimized, performance-optimized)
71    pub optimization: crate::structures::IndexOptimization,
72    /// Reload interval in milliseconds for IndexReader (how often to check for new segments)
73    pub reload_interval_ms: u64,
74    /// Maximum number of concurrent background merges (default: 4)
75    pub max_concurrent_merges: usize,
76}
77
78impl Default for IndexConfig {
79    fn default() -> Self {
80        #[cfg(feature = "native")]
81        let indexing_threads = crate::default_indexing_threads();
82        #[cfg(not(feature = "native"))]
83        let indexing_threads = 1;
84
85        #[cfg(feature = "native")]
86        let compression_threads = crate::default_compression_threads();
87        #[cfg(not(feature = "native"))]
88        let compression_threads = 1;
89
90        Self {
91            num_threads: indexing_threads,
92            num_indexing_threads: 1, // Increase to 2+ for production to avoid stalls during segment build
93            num_compression_threads: compression_threads,
94            term_cache_blocks: 256,
95            store_cache_blocks: 32,
96            max_indexing_memory_bytes: 256 * 1024 * 1024, // 256 MB default
97            merge_policy: Box::new(crate::merge::TieredMergePolicy::default()),
98            optimization: crate::structures::IndexOptimization::default(),
99            reload_interval_ms: 1000, // 1 second default
100            max_concurrent_merges: 4,
101        }
102    }
103}
104
105/// Multi-segment async Index
106///
107/// The central concept for search. Owns segment lifecycle and provides:
108/// - `Index::create()` / `Index::open()` - create or open an index
109/// - `index.writer()` - get an IndexWriter for adding documents
110/// - `index.reader()` - get an IndexReader for searching with reload policy
111///
112/// All segment management is delegated to SegmentManager.
113#[cfg(feature = "native")]
114pub struct Index<D: crate::directories::DirectoryWriter + 'static> {
115    directory: Arc<D>,
116    schema: Arc<Schema>,
117    config: IndexConfig,
118    /// Segment manager - owns segments, tracker, metadata, and trained structures
119    segment_manager: Arc<crate::merge::SegmentManager<D>>,
120    /// Cached reader (created lazily, reused across calls)
121    cached_reader: tokio::sync::OnceCell<IndexReader<D>>,
122}
123
124#[cfg(feature = "native")]
125impl<D: crate::directories::DirectoryWriter + 'static> Index<D> {
126    /// Create a new index in the directory
127    pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
128        let directory = Arc::new(directory);
129        let schema = Arc::new(schema);
130        let metadata = IndexMetadata::new((*schema).clone());
131
132        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
133            Arc::clone(&directory),
134            Arc::clone(&schema),
135            metadata,
136            config.merge_policy.clone_box(),
137            config.term_cache_blocks,
138            config.max_concurrent_merges,
139        ));
140
141        // Save initial metadata
142        segment_manager.update_metadata(|_| {}).await?;
143
144        Ok(Self {
145            directory,
146            schema,
147            config,
148            segment_manager,
149            cached_reader: tokio::sync::OnceCell::new(),
150        })
151    }
152
153    /// Open an existing index from a directory
154    pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
155        let directory = Arc::new(directory);
156
157        // Load metadata (includes schema)
158        let metadata = IndexMetadata::load(directory.as_ref()).await?;
159        let schema = Arc::new(metadata.schema.clone());
160
161        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
162            Arc::clone(&directory),
163            Arc::clone(&schema),
164            metadata,
165            config.merge_policy.clone_box(),
166            config.term_cache_blocks,
167            config.max_concurrent_merges,
168        ));
169
170        // Load trained structures into SegmentManager's ArcSwap
171        segment_manager.load_and_publish_trained().await;
172
173        Ok(Self {
174            directory,
175            schema,
176            config,
177            segment_manager,
178            cached_reader: tokio::sync::OnceCell::new(),
179        })
180    }
181
182    /// Get the schema
183    pub fn schema(&self) -> &Schema {
184        &self.schema
185    }
186
187    /// Get the schema as an Arc reference (avoids clone when Arc is needed)
188    pub fn schema_arc(&self) -> &Arc<Schema> {
189        &self.schema
190    }
191
192    /// Get a reference to the underlying directory
193    pub fn directory(&self) -> &D {
194        &self.directory
195    }
196
197    /// Get the segment manager
198    pub fn segment_manager(&self) -> &Arc<crate::merge::SegmentManager<D>> {
199        &self.segment_manager
200    }
201
202    /// Get an IndexReader for searching (with reload policy)
203    ///
204    /// The reader is cached and reused across calls. The reader's internal
205    /// searcher will reload segments based on its reload interval (configurable via IndexConfig).
206    pub async fn reader(&self) -> Result<&IndexReader<D>> {
207        self.cached_reader
208            .get_or_try_init(|| async {
209                IndexReader::from_segment_manager(
210                    Arc::clone(&self.schema),
211                    Arc::clone(&self.segment_manager),
212                    self.config.term_cache_blocks,
213                    self.config.reload_interval_ms,
214                )
215                .await
216            })
217            .await
218    }
219
220    /// Get the config
221    pub fn config(&self) -> &IndexConfig {
222        &self.config
223    }
224
225    /// Get segment readers for query execution (convenience method)
226    pub async fn segment_readers(&self) -> Result<Vec<Arc<crate::segment::SegmentReader>>> {
227        let reader = self.reader().await?;
228        let searcher = reader.searcher().await?;
229        Ok(searcher.segment_readers().to_vec())
230    }
231
232    /// Total number of documents across all segments
233    pub async fn num_docs(&self) -> Result<u32> {
234        let reader = self.reader().await?;
235        let searcher = reader.searcher().await?;
236        Ok(searcher.num_docs())
237    }
238
239    /// Get default fields for search
240    pub fn default_fields(&self) -> Vec<crate::Field> {
241        if !self.schema.default_fields().is_empty() {
242            self.schema.default_fields().to_vec()
243        } else {
244            self.schema
245                .fields()
246                .filter(|(_, entry)| {
247                    entry.indexed && entry.field_type == crate::dsl::FieldType::Text
248                })
249                .map(|(field, _)| field)
250                .collect()
251        }
252    }
253
254    /// Get tokenizer registry
255    pub fn tokenizers(&self) -> Arc<crate::tokenizer::TokenizerRegistry> {
256        Arc::new(crate::tokenizer::TokenizerRegistry::default())
257    }
258
259    /// Create a query parser for this index
260    pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
261        let default_fields = self.default_fields();
262        let tokenizers = self.tokenizers();
263
264        let query_routers = self.schema.query_routers();
265        if !query_routers.is_empty()
266            && let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers)
267        {
268            return crate::dsl::QueryLanguageParser::with_router(
269                Arc::clone(&self.schema),
270                default_fields,
271                tokenizers,
272                router,
273            );
274        }
275
276        crate::dsl::QueryLanguageParser::new(Arc::clone(&self.schema), default_fields, tokenizers)
277    }
278
279    /// Parse and search using a query string
280    pub async fn query(
281        &self,
282        query_str: &str,
283        limit: usize,
284    ) -> Result<crate::query::SearchResponse> {
285        self.query_offset(query_str, limit, 0).await
286    }
287
288    /// Query with offset for pagination
289    pub async fn query_offset(
290        &self,
291        query_str: &str,
292        limit: usize,
293        offset: usize,
294    ) -> Result<crate::query::SearchResponse> {
295        let parser = self.query_parser();
296        let query = parser
297            .parse(query_str)
298            .map_err(crate::error::Error::Query)?;
299        self.search_offset(query.as_ref(), limit, offset).await
300    }
301
302    /// Search and return results
303    pub async fn search(
304        &self,
305        query: &dyn crate::query::Query,
306        limit: usize,
307    ) -> Result<crate::query::SearchResponse> {
308        self.search_offset(query, limit, 0).await
309    }
310
311    /// Search with offset for pagination
312    pub async fn search_offset(
313        &self,
314        query: &dyn crate::query::Query,
315        limit: usize,
316        offset: usize,
317    ) -> Result<crate::query::SearchResponse> {
318        let reader = self.reader().await?;
319        let searcher = reader.searcher().await?;
320
321        #[cfg(feature = "sync")]
322        let (results, total_seen) = {
323            // Sync search: rayon handles segment parallelism internally.
324            // On multi-threaded tokio, use block_in_place to yield the worker;
325            // on single-threaded (tests), call directly.
326            let runtime_flavor = tokio::runtime::Handle::current().runtime_flavor();
327            if runtime_flavor == tokio::runtime::RuntimeFlavor::MultiThread {
328                tokio::task::block_in_place(|| {
329                    searcher.search_with_offset_and_count_sync(query, limit, offset)
330                })?
331            } else {
332                searcher.search_with_offset_and_count_sync(query, limit, offset)?
333            }
334        };
335
336        #[cfg(not(feature = "sync"))]
337        let (results, total_seen) = {
338            searcher
339                .search_with_offset_and_count(query, limit, offset)
340                .await?
341        };
342
343        let total_hits = total_seen;
344        let hits: Vec<crate::query::SearchHit> = results
345            .into_iter()
346            .map(|result| crate::query::SearchHit {
347                address: crate::query::DocAddress::new(result.segment_id, result.doc_id),
348                score: result.score,
349                matched_fields: result.extract_ordinals(),
350            })
351            .collect();
352
353        Ok(crate::query::SearchResponse { hits, total_hits })
354    }
355
356    /// Get a document by its unique address
357    pub async fn get_document(
358        &self,
359        address: &crate::query::DocAddress,
360    ) -> Result<Option<crate::dsl::Document>> {
361        let reader = self.reader().await?;
362        let searcher = reader.searcher().await?;
363        searcher.get_document(address).await
364    }
365
366    /// Get posting lists for a term across all segments
367    pub async fn get_postings(
368        &self,
369        field: crate::Field,
370        term: &[u8],
371    ) -> Result<
372        Vec<(
373            Arc<crate::segment::SegmentReader>,
374            crate::structures::BlockPostingList,
375        )>,
376    > {
377        let segments = self.segment_readers().await?;
378        let mut results = Vec::new();
379
380        for segment in segments {
381            if let Some(postings) = segment.get_postings(field, term).await? {
382                results.push((segment, postings));
383            }
384        }
385
386        Ok(results)
387    }
388}
389
390/// Native-only methods for Index
391#[cfg(feature = "native")]
392impl<D: crate::directories::DirectoryWriter + 'static> Index<D> {
393    /// Get an IndexWriter for adding documents
394    pub fn writer(&self) -> writer::IndexWriter<D> {
395        writer::IndexWriter::from_index(self)
396    }
397}
398
399#[cfg(test)]
400mod tests;
401
402// (tests moved to index/tests/ module)