1#[cfg(feature = "native")]
11use crate::dsl::Schema;
12#[cfg(feature = "native")]
13use crate::error::Result;
14#[cfg(feature = "native")]
15use std::sync::Arc;
16
17mod searcher;
18pub use searcher::Searcher;
19
20#[cfg(feature = "native")]
21mod reader;
22#[cfg(feature = "native")]
23mod vector_builder;
24#[cfg(feature = "native")]
25mod writer;
26#[cfg(feature = "native")]
27pub use reader::IndexReader;
28#[cfg(feature = "native")]
29pub use writer::{IndexWriter, PreparedCommit};
30
31mod metadata;
32pub use metadata::{FieldVectorMeta, INDEX_META_FILENAME, IndexMetadata, VectorIndexState};
33
34#[cfg(feature = "native")]
35mod helpers;
36#[cfg(feature = "native")]
37pub use helpers::{
38 IndexingStats, SchemaConfig, SchemaFieldConfig, create_index_at_path, create_index_from_sdl,
39 index_documents_from_reader, index_json_document, parse_schema,
40};
41
42pub const SLICE_CACHE_FILENAME: &str = "index.slicecache";
44
45#[derive(Debug, Clone)]
47pub struct IndexConfig {
48 pub num_threads: usize,
50 pub num_indexing_threads: usize,
52 pub num_compression_threads: usize,
54 pub term_cache_blocks: usize,
56 pub store_cache_blocks: usize,
58 pub max_indexing_memory_bytes: usize,
60 pub merge_policy: Box<dyn crate::merge::MergePolicy>,
62 pub optimization: crate::structures::IndexOptimization,
64 pub reload_interval_ms: u64,
66 pub max_concurrent_merges: usize,
68}
69
70impl Default for IndexConfig {
71 fn default() -> Self {
72 #[cfg(feature = "native")]
73 let indexing_threads = crate::default_indexing_threads();
74 #[cfg(not(feature = "native"))]
75 let indexing_threads = 1;
76
77 #[cfg(feature = "native")]
78 let compression_threads = crate::default_compression_threads();
79 #[cfg(not(feature = "native"))]
80 let compression_threads = 1;
81
82 Self {
83 num_threads: indexing_threads,
84 num_indexing_threads: 1, num_compression_threads: compression_threads,
86 term_cache_blocks: 256,
87 store_cache_blocks: 32,
88 max_indexing_memory_bytes: 256 * 1024 * 1024, merge_policy: Box::new(crate::merge::TieredMergePolicy::default()),
90 optimization: crate::structures::IndexOptimization::default(),
91 reload_interval_ms: 1000, max_concurrent_merges: 4,
93 }
94 }
95}
96
97#[cfg(feature = "native")]
106pub struct Index<D: crate::directories::DirectoryWriter + 'static> {
107 directory: Arc<D>,
108 schema: Arc<Schema>,
109 config: IndexConfig,
110 segment_manager: Arc<crate::merge::SegmentManager<D>>,
112 cached_reader: tokio::sync::OnceCell<IndexReader<D>>,
114}
115
116#[cfg(feature = "native")]
117impl<D: crate::directories::DirectoryWriter + 'static> Index<D> {
118 pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
120 let directory = Arc::new(directory);
121 let schema = Arc::new(schema);
122 let metadata = IndexMetadata::new((*schema).clone());
123
124 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
125 Arc::clone(&directory),
126 Arc::clone(&schema),
127 metadata,
128 config.merge_policy.clone_box(),
129 config.term_cache_blocks,
130 config.max_concurrent_merges,
131 ));
132
133 segment_manager.update_metadata(|_| {}).await?;
135
136 Ok(Self {
137 directory,
138 schema,
139 config,
140 segment_manager,
141 cached_reader: tokio::sync::OnceCell::new(),
142 })
143 }
144
145 pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
147 let directory = Arc::new(directory);
148
149 let metadata = IndexMetadata::load(directory.as_ref()).await?;
151 let schema = Arc::new(metadata.schema.clone());
152
153 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
154 Arc::clone(&directory),
155 Arc::clone(&schema),
156 metadata,
157 config.merge_policy.clone_box(),
158 config.term_cache_blocks,
159 config.max_concurrent_merges,
160 ));
161
162 segment_manager.load_and_publish_trained().await;
164
165 Ok(Self {
166 directory,
167 schema,
168 config,
169 segment_manager,
170 cached_reader: tokio::sync::OnceCell::new(),
171 })
172 }
173
174 pub fn schema(&self) -> &Schema {
176 &self.schema
177 }
178
179 pub fn schema_arc(&self) -> &Arc<Schema> {
181 &self.schema
182 }
183
184 pub fn directory(&self) -> &D {
186 &self.directory
187 }
188
189 pub fn segment_manager(&self) -> &Arc<crate::merge::SegmentManager<D>> {
191 &self.segment_manager
192 }
193
194 pub async fn reader(&self) -> Result<&IndexReader<D>> {
199 self.cached_reader
200 .get_or_try_init(|| async {
201 IndexReader::from_segment_manager(
202 Arc::clone(&self.schema),
203 Arc::clone(&self.segment_manager),
204 self.config.term_cache_blocks,
205 self.config.reload_interval_ms,
206 )
207 .await
208 })
209 .await
210 }
211
212 pub fn config(&self) -> &IndexConfig {
214 &self.config
215 }
216
217 pub async fn segment_readers(&self) -> Result<Vec<Arc<crate::segment::SegmentReader>>> {
219 let reader = self.reader().await?;
220 let searcher = reader.searcher().await?;
221 Ok(searcher.segment_readers().to_vec())
222 }
223
224 pub async fn num_docs(&self) -> Result<u32> {
226 let reader = self.reader().await?;
227 let searcher = reader.searcher().await?;
228 Ok(searcher.num_docs())
229 }
230
231 pub fn default_fields(&self) -> Vec<crate::Field> {
233 if !self.schema.default_fields().is_empty() {
234 self.schema.default_fields().to_vec()
235 } else {
236 self.schema
237 .fields()
238 .filter(|(_, entry)| {
239 entry.indexed && entry.field_type == crate::dsl::FieldType::Text
240 })
241 .map(|(field, _)| field)
242 .collect()
243 }
244 }
245
246 pub fn tokenizers(&self) -> Arc<crate::tokenizer::TokenizerRegistry> {
248 Arc::new(crate::tokenizer::TokenizerRegistry::default())
249 }
250
251 pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
253 let default_fields = self.default_fields();
254 let tokenizers = self.tokenizers();
255
256 let query_routers = self.schema.query_routers();
257 if !query_routers.is_empty()
258 && let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers)
259 {
260 return crate::dsl::QueryLanguageParser::with_router(
261 Arc::clone(&self.schema),
262 default_fields,
263 tokenizers,
264 router,
265 );
266 }
267
268 crate::dsl::QueryLanguageParser::new(Arc::clone(&self.schema), default_fields, tokenizers)
269 }
270
271 pub async fn query(
273 &self,
274 query_str: &str,
275 limit: usize,
276 ) -> Result<crate::query::SearchResponse> {
277 self.query_offset(query_str, limit, 0).await
278 }
279
280 pub async fn query_offset(
282 &self,
283 query_str: &str,
284 limit: usize,
285 offset: usize,
286 ) -> Result<crate::query::SearchResponse> {
287 let parser = self.query_parser();
288 let query = parser
289 .parse(query_str)
290 .map_err(crate::error::Error::Query)?;
291 self.search_offset(query.as_ref(), limit, offset).await
292 }
293
294 pub async fn search(
296 &self,
297 query: &dyn crate::query::Query,
298 limit: usize,
299 ) -> Result<crate::query::SearchResponse> {
300 self.search_offset(query, limit, 0).await
301 }
302
303 pub async fn search_offset(
305 &self,
306 query: &dyn crate::query::Query,
307 limit: usize,
308 offset: usize,
309 ) -> Result<crate::query::SearchResponse> {
310 let reader = self.reader().await?;
311 let searcher = reader.searcher().await?;
312
313 #[cfg(feature = "sync")]
314 let (results, total_seen) = {
315 let runtime_flavor = tokio::runtime::Handle::current().runtime_flavor();
319 if runtime_flavor == tokio::runtime::RuntimeFlavor::MultiThread {
320 tokio::task::block_in_place(|| {
321 searcher.search_with_offset_and_count_sync(query, limit, offset)
322 })?
323 } else {
324 searcher.search_with_offset_and_count_sync(query, limit, offset)?
325 }
326 };
327
328 #[cfg(not(feature = "sync"))]
329 let (results, total_seen) = {
330 searcher
331 .search_with_offset_and_count(query, limit, offset)
332 .await?
333 };
334
335 let total_hits = total_seen;
336 let hits: Vec<crate::query::SearchHit> = results
337 .into_iter()
338 .map(|result| crate::query::SearchHit {
339 address: crate::query::DocAddress::new(result.segment_id, result.doc_id),
340 score: result.score,
341 matched_fields: result.extract_ordinals(),
342 })
343 .collect();
344
345 Ok(crate::query::SearchResponse { hits, total_hits })
346 }
347
348 pub async fn get_document(
350 &self,
351 address: &crate::query::DocAddress,
352 ) -> Result<Option<crate::dsl::Document>> {
353 let reader = self.reader().await?;
354 let searcher = reader.searcher().await?;
355 searcher.get_document(address).await
356 }
357
358 pub async fn get_postings(
360 &self,
361 field: crate::Field,
362 term: &[u8],
363 ) -> Result<
364 Vec<(
365 Arc<crate::segment::SegmentReader>,
366 crate::structures::BlockPostingList,
367 )>,
368 > {
369 let segments = self.segment_readers().await?;
370 let mut results = Vec::new();
371
372 for segment in segments {
373 if let Some(postings) = segment.get_postings(field, term).await? {
374 results.push((segment, postings));
375 }
376 }
377
378 Ok(results)
379 }
380}
381
382#[cfg(feature = "native")]
384impl<D: crate::directories::DirectoryWriter + 'static> Index<D> {
385 pub fn writer(&self) -> writer::IndexWriter<D> {
387 writer::IndexWriter::from_index(self)
388 }
389}
390
391#[cfg(test)]
392mod tests;
393
394