1#[cfg(feature = "native")]
11use crate::dsl::Schema;
12#[cfg(feature = "native")]
13use crate::error::Result;
14#[cfg(feature = "native")]
15use std::sync::Arc;
16
17mod searcher;
18pub use searcher::Searcher;
19
20#[cfg(feature = "native")]
21mod primary_key;
22#[cfg(feature = "native")]
23mod reader;
24#[cfg(feature = "native")]
25mod vector_builder;
26#[cfg(feature = "native")]
27mod writer;
28#[cfg(feature = "native")]
29pub use primary_key::PrimaryKeyIndex;
30#[cfg(feature = "native")]
31pub use reader::IndexReader;
32#[cfg(feature = "native")]
33pub use writer::{IndexWriter, PreparedCommit};
34
35mod metadata;
36pub use metadata::{FieldVectorMeta, INDEX_META_FILENAME, IndexMetadata, VectorIndexState};
37
38#[cfg(feature = "native")]
39mod helpers;
40#[cfg(feature = "native")]
41pub use helpers::{
42 IndexingStats, SchemaConfig, SchemaFieldConfig, create_index_at_path, create_index_from_sdl,
43 index_documents_from_reader, index_json_document, parse_schema,
44};
45
46pub const SLICE_CACHE_FILENAME: &str = "index.slicecache";
48
49#[derive(Debug, Clone)]
51pub struct IndexConfig {
52 pub num_threads: usize,
54 pub num_indexing_threads: usize,
56 pub num_compression_threads: usize,
58 pub term_cache_blocks: usize,
60 pub store_cache_blocks: usize,
62 pub max_indexing_memory_bytes: usize,
64 pub merge_policy: Box<dyn crate::merge::MergePolicy>,
66 pub optimization: crate::structures::IndexOptimization,
68 pub reload_interval_ms: u64,
70 pub max_concurrent_merges: usize,
72}
73
74impl Default for IndexConfig {
75 fn default() -> Self {
76 #[cfg(feature = "native")]
77 let indexing_threads = crate::default_indexing_threads();
78 #[cfg(not(feature = "native"))]
79 let indexing_threads = 1;
80
81 #[cfg(feature = "native")]
82 let compression_threads = crate::default_compression_threads();
83 #[cfg(not(feature = "native"))]
84 let compression_threads = 1;
85
86 Self {
87 num_threads: indexing_threads,
88 num_indexing_threads: 1, num_compression_threads: compression_threads,
90 term_cache_blocks: 256,
91 store_cache_blocks: 32,
92 max_indexing_memory_bytes: 256 * 1024 * 1024, merge_policy: Box::new(crate::merge::TieredMergePolicy::default()),
94 optimization: crate::structures::IndexOptimization::default(),
95 reload_interval_ms: 1000, max_concurrent_merges: 4,
97 }
98 }
99}
100
101#[cfg(feature = "native")]
110pub struct Index<D: crate::directories::DirectoryWriter + 'static> {
111 directory: Arc<D>,
112 schema: Arc<Schema>,
113 config: IndexConfig,
114 segment_manager: Arc<crate::merge::SegmentManager<D>>,
116 cached_reader: tokio::sync::OnceCell<IndexReader<D>>,
118}
119
120#[cfg(feature = "native")]
121impl<D: crate::directories::DirectoryWriter + 'static> Index<D> {
122 pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
124 let directory = Arc::new(directory);
125 let schema = Arc::new(schema);
126 let metadata = IndexMetadata::new((*schema).clone());
127
128 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
129 Arc::clone(&directory),
130 Arc::clone(&schema),
131 metadata,
132 config.merge_policy.clone_box(),
133 config.term_cache_blocks,
134 config.max_concurrent_merges,
135 ));
136
137 segment_manager.update_metadata(|_| {}).await?;
139
140 Ok(Self {
141 directory,
142 schema,
143 config,
144 segment_manager,
145 cached_reader: tokio::sync::OnceCell::new(),
146 })
147 }
148
149 pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
151 let directory = Arc::new(directory);
152
153 let metadata = IndexMetadata::load(directory.as_ref()).await?;
155 let schema = Arc::new(metadata.schema.clone());
156
157 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
158 Arc::clone(&directory),
159 Arc::clone(&schema),
160 metadata,
161 config.merge_policy.clone_box(),
162 config.term_cache_blocks,
163 config.max_concurrent_merges,
164 ));
165
166 segment_manager.load_and_publish_trained().await;
168
169 Ok(Self {
170 directory,
171 schema,
172 config,
173 segment_manager,
174 cached_reader: tokio::sync::OnceCell::new(),
175 })
176 }
177
178 pub fn schema(&self) -> &Schema {
180 &self.schema
181 }
182
183 pub fn schema_arc(&self) -> &Arc<Schema> {
185 &self.schema
186 }
187
188 pub fn directory(&self) -> &D {
190 &self.directory
191 }
192
193 pub fn segment_manager(&self) -> &Arc<crate::merge::SegmentManager<D>> {
195 &self.segment_manager
196 }
197
198 pub async fn reader(&self) -> Result<&IndexReader<D>> {
203 self.cached_reader
204 .get_or_try_init(|| async {
205 IndexReader::from_segment_manager(
206 Arc::clone(&self.schema),
207 Arc::clone(&self.segment_manager),
208 self.config.term_cache_blocks,
209 self.config.reload_interval_ms,
210 )
211 .await
212 })
213 .await
214 }
215
216 pub fn config(&self) -> &IndexConfig {
218 &self.config
219 }
220
221 pub async fn segment_readers(&self) -> Result<Vec<Arc<crate::segment::SegmentReader>>> {
223 let reader = self.reader().await?;
224 let searcher = reader.searcher().await?;
225 Ok(searcher.segment_readers().to_vec())
226 }
227
228 pub async fn num_docs(&self) -> Result<u32> {
230 let reader = self.reader().await?;
231 let searcher = reader.searcher().await?;
232 Ok(searcher.num_docs())
233 }
234
235 pub fn default_fields(&self) -> Vec<crate::Field> {
237 if !self.schema.default_fields().is_empty() {
238 self.schema.default_fields().to_vec()
239 } else {
240 self.schema
241 .fields()
242 .filter(|(_, entry)| {
243 entry.indexed && entry.field_type == crate::dsl::FieldType::Text
244 })
245 .map(|(field, _)| field)
246 .collect()
247 }
248 }
249
250 pub fn tokenizers(&self) -> Arc<crate::tokenizer::TokenizerRegistry> {
252 Arc::new(crate::tokenizer::TokenizerRegistry::default())
253 }
254
255 pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
257 let default_fields = self.default_fields();
258 let tokenizers = self.tokenizers();
259
260 let query_routers = self.schema.query_routers();
261 if !query_routers.is_empty()
262 && let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers)
263 {
264 return crate::dsl::QueryLanguageParser::with_router(
265 Arc::clone(&self.schema),
266 default_fields,
267 tokenizers,
268 router,
269 );
270 }
271
272 crate::dsl::QueryLanguageParser::new(Arc::clone(&self.schema), default_fields, tokenizers)
273 }
274
275 pub async fn query(
277 &self,
278 query_str: &str,
279 limit: usize,
280 ) -> Result<crate::query::SearchResponse> {
281 self.query_offset(query_str, limit, 0).await
282 }
283
284 pub async fn query_offset(
286 &self,
287 query_str: &str,
288 limit: usize,
289 offset: usize,
290 ) -> Result<crate::query::SearchResponse> {
291 let parser = self.query_parser();
292 let query = parser
293 .parse(query_str)
294 .map_err(crate::error::Error::Query)?;
295 self.search_offset(query.as_ref(), limit, offset).await
296 }
297
298 pub async fn search(
300 &self,
301 query: &dyn crate::query::Query,
302 limit: usize,
303 ) -> Result<crate::query::SearchResponse> {
304 self.search_offset(query, limit, 0).await
305 }
306
307 pub async fn search_offset(
309 &self,
310 query: &dyn crate::query::Query,
311 limit: usize,
312 offset: usize,
313 ) -> Result<crate::query::SearchResponse> {
314 let reader = self.reader().await?;
315 let searcher = reader.searcher().await?;
316
317 #[cfg(feature = "sync")]
318 let (results, total_seen) = {
319 let runtime_flavor = tokio::runtime::Handle::current().runtime_flavor();
323 if runtime_flavor == tokio::runtime::RuntimeFlavor::MultiThread {
324 tokio::task::block_in_place(|| {
325 searcher.search_with_offset_and_count_sync(query, limit, offset)
326 })?
327 } else {
328 searcher.search_with_offset_and_count_sync(query, limit, offset)?
329 }
330 };
331
332 #[cfg(not(feature = "sync"))]
333 let (results, total_seen) = {
334 searcher
335 .search_with_offset_and_count(query, limit, offset)
336 .await?
337 };
338
339 let total_hits = total_seen;
340 let hits: Vec<crate::query::SearchHit> = results
341 .into_iter()
342 .map(|result| crate::query::SearchHit {
343 address: crate::query::DocAddress::new(result.segment_id, result.doc_id),
344 score: result.score,
345 matched_fields: result.extract_ordinals(),
346 })
347 .collect();
348
349 Ok(crate::query::SearchResponse { hits, total_hits })
350 }
351
352 pub async fn get_document(
354 &self,
355 address: &crate::query::DocAddress,
356 ) -> Result<Option<crate::dsl::Document>> {
357 let reader = self.reader().await?;
358 let searcher = reader.searcher().await?;
359 searcher.get_document(address).await
360 }
361
362 pub async fn get_postings(
364 &self,
365 field: crate::Field,
366 term: &[u8],
367 ) -> Result<
368 Vec<(
369 Arc<crate::segment::SegmentReader>,
370 crate::structures::BlockPostingList,
371 )>,
372 > {
373 let segments = self.segment_readers().await?;
374 let mut results = Vec::new();
375
376 for segment in segments {
377 if let Some(postings) = segment.get_postings(field, term).await? {
378 results.push((segment, postings));
379 }
380 }
381
382 Ok(results)
383 }
384}
385
386#[cfg(feature = "native")]
388impl<D: crate::directories::DirectoryWriter + 'static> Index<D> {
389 pub fn writer(&self) -> writer::IndexWriter<D> {
391 writer::IndexWriter::from_index(self)
392 }
393}
394
395#[cfg(test)]
396mod tests;
397
398