1#[cfg(feature = "native")]
11use crate::dsl::Schema;
12#[cfg(feature = "native")]
13use crate::error::Result;
14#[cfg(feature = "native")]
15use std::sync::Arc;
16
17mod searcher;
18pub use searcher::Searcher;
19
20#[cfg(feature = "native")]
21mod primary_key;
22#[cfg(feature = "native")]
23mod reader;
24#[cfg(feature = "native")]
25mod vector_builder;
26#[cfg(all(feature = "wasm", not(feature = "native")))]
27mod wasm_writer;
28#[cfg(feature = "native")]
29mod writer;
30#[cfg(feature = "native")]
31pub use primary_key::PrimaryKeyIndex;
32#[cfg(feature = "native")]
33pub use reader::IndexReader;
34#[cfg(all(feature = "wasm", not(feature = "native")))]
35pub use wasm_writer::IndexWriter as WasmIndexWriter;
36#[cfg(feature = "native")]
37pub use writer::{IndexWriter, PreparedCommit};
38
39mod metadata;
40pub use metadata::{FieldVectorMeta, INDEX_META_FILENAME, IndexMetadata, VectorIndexState};
41
42#[cfg(feature = "native")]
43mod helpers;
44#[cfg(feature = "native")]
45pub use helpers::{
46 IndexingStats, SchemaConfig, SchemaFieldConfig, create_index_at_path, create_index_from_sdl,
47 index_documents_from_reader, index_json_document, parse_schema,
48};
49
50pub const SLICE_CACHE_FILENAME: &str = "index.slicecache";
52
53#[derive(Debug, Clone)]
55pub struct IndexConfig {
56 pub num_threads: usize,
58 pub num_indexing_threads: usize,
60 pub num_compression_threads: usize,
62 pub term_cache_blocks: usize,
64 pub store_cache_blocks: usize,
66 pub max_indexing_memory_bytes: usize,
68 pub merge_policy: Box<dyn crate::merge::MergePolicy>,
70 pub optimization: crate::structures::IndexOptimization,
72 pub reload_interval_ms: u64,
74 pub max_concurrent_merges: usize,
76}
77
78impl Default for IndexConfig {
79 fn default() -> Self {
80 #[cfg(feature = "native")]
81 let indexing_threads = crate::default_indexing_threads();
82 #[cfg(not(feature = "native"))]
83 let indexing_threads = 1;
84
85 #[cfg(feature = "native")]
86 let compression_threads = crate::default_compression_threads();
87 #[cfg(not(feature = "native"))]
88 let compression_threads = 1;
89
90 Self {
91 num_threads: indexing_threads,
92 num_indexing_threads: 1, num_compression_threads: compression_threads,
94 term_cache_blocks: 256,
95 store_cache_blocks: 32,
96 max_indexing_memory_bytes: 256 * 1024 * 1024, merge_policy: Box::new(crate::merge::TieredMergePolicy::default()),
98 optimization: crate::structures::IndexOptimization::default(),
99 reload_interval_ms: 1000, max_concurrent_merges: 4,
101 }
102 }
103}
104
105#[cfg(feature = "native")]
114pub struct Index<D: crate::directories::DirectoryWriter + 'static> {
115 directory: Arc<D>,
116 schema: Arc<Schema>,
117 config: IndexConfig,
118 segment_manager: Arc<crate::merge::SegmentManager<D>>,
120 cached_reader: tokio::sync::OnceCell<IndexReader<D>>,
122}
123
124#[cfg(feature = "native")]
125impl<D: crate::directories::DirectoryWriter + 'static> Index<D> {
126 pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
128 let directory = Arc::new(directory);
129 let schema = Arc::new(schema);
130 let metadata = IndexMetadata::new((*schema).clone());
131
132 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
133 Arc::clone(&directory),
134 Arc::clone(&schema),
135 metadata,
136 config.merge_policy.clone_box(),
137 config.term_cache_blocks,
138 config.max_concurrent_merges,
139 ));
140
141 segment_manager.update_metadata(|_| {}).await?;
143
144 Ok(Self {
145 directory,
146 schema,
147 config,
148 segment_manager,
149 cached_reader: tokio::sync::OnceCell::new(),
150 })
151 }
152
153 pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
155 let directory = Arc::new(directory);
156
157 let metadata = IndexMetadata::load(directory.as_ref()).await?;
159 let schema = Arc::new(metadata.schema.clone());
160
161 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
162 Arc::clone(&directory),
163 Arc::clone(&schema),
164 metadata,
165 config.merge_policy.clone_box(),
166 config.term_cache_blocks,
167 config.max_concurrent_merges,
168 ));
169
170 segment_manager.load_and_publish_trained().await;
172
173 Ok(Self {
174 directory,
175 schema,
176 config,
177 segment_manager,
178 cached_reader: tokio::sync::OnceCell::new(),
179 })
180 }
181
182 pub fn schema(&self) -> &Schema {
184 &self.schema
185 }
186
187 pub fn schema_arc(&self) -> &Arc<Schema> {
189 &self.schema
190 }
191
192 pub fn directory(&self) -> &D {
194 &self.directory
195 }
196
197 pub fn segment_manager(&self) -> &Arc<crate::merge::SegmentManager<D>> {
199 &self.segment_manager
200 }
201
202 pub async fn reader(&self) -> Result<&IndexReader<D>> {
207 self.cached_reader
208 .get_or_try_init(|| async {
209 IndexReader::from_segment_manager(
210 Arc::clone(&self.schema),
211 Arc::clone(&self.segment_manager),
212 self.config.term_cache_blocks,
213 self.config.reload_interval_ms,
214 )
215 .await
216 })
217 .await
218 }
219
220 pub fn config(&self) -> &IndexConfig {
222 &self.config
223 }
224
225 pub async fn segment_readers(&self) -> Result<Vec<Arc<crate::segment::SegmentReader>>> {
227 let reader = self.reader().await?;
228 let searcher = reader.searcher().await?;
229 Ok(searcher.segment_readers().to_vec())
230 }
231
232 pub async fn num_docs(&self) -> Result<u32> {
234 let reader = self.reader().await?;
235 let searcher = reader.searcher().await?;
236 Ok(searcher.num_docs())
237 }
238
239 pub fn default_fields(&self) -> Vec<crate::Field> {
241 if !self.schema.default_fields().is_empty() {
242 self.schema.default_fields().to_vec()
243 } else {
244 self.schema
245 .fields()
246 .filter(|(_, entry)| {
247 entry.indexed && entry.field_type == crate::dsl::FieldType::Text
248 })
249 .map(|(field, _)| field)
250 .collect()
251 }
252 }
253
254 pub fn tokenizers(&self) -> Arc<crate::tokenizer::TokenizerRegistry> {
256 Arc::new(crate::tokenizer::TokenizerRegistry::default())
257 }
258
259 pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
261 let default_fields = self.default_fields();
262 let tokenizers = self.tokenizers();
263
264 let query_routers = self.schema.query_routers();
265 if !query_routers.is_empty()
266 && let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers)
267 {
268 return crate::dsl::QueryLanguageParser::with_router(
269 Arc::clone(&self.schema),
270 default_fields,
271 tokenizers,
272 router,
273 );
274 }
275
276 crate::dsl::QueryLanguageParser::new(Arc::clone(&self.schema), default_fields, tokenizers)
277 }
278
279 pub async fn query(
281 &self,
282 query_str: &str,
283 limit: usize,
284 ) -> Result<crate::query::SearchResponse> {
285 self.query_offset(query_str, limit, 0).await
286 }
287
288 pub async fn query_offset(
290 &self,
291 query_str: &str,
292 limit: usize,
293 offset: usize,
294 ) -> Result<crate::query::SearchResponse> {
295 let parser = self.query_parser();
296 let query = parser
297 .parse(query_str)
298 .map_err(crate::error::Error::Query)?;
299 self.search_offset(query.as_ref(), limit, offset).await
300 }
301
302 pub async fn search(
304 &self,
305 query: &dyn crate::query::Query,
306 limit: usize,
307 ) -> Result<crate::query::SearchResponse> {
308 self.search_offset(query, limit, 0).await
309 }
310
311 pub async fn search_offset(
313 &self,
314 query: &dyn crate::query::Query,
315 limit: usize,
316 offset: usize,
317 ) -> Result<crate::query::SearchResponse> {
318 let reader = self.reader().await?;
319 let searcher = reader.searcher().await?;
320
321 #[cfg(feature = "sync")]
322 let (results, total_seen) = {
323 let runtime_flavor = tokio::runtime::Handle::current().runtime_flavor();
327 if runtime_flavor == tokio::runtime::RuntimeFlavor::MultiThread {
328 tokio::task::block_in_place(|| {
329 searcher.search_with_offset_and_count_sync(query, limit, offset)
330 })?
331 } else {
332 searcher.search_with_offset_and_count_sync(query, limit, offset)?
333 }
334 };
335
336 #[cfg(not(feature = "sync"))]
337 let (results, total_seen) = {
338 searcher
339 .search_with_offset_and_count(query, limit, offset)
340 .await?
341 };
342
343 let total_hits = total_seen;
344 let hits: Vec<crate::query::SearchHit> = results
345 .into_iter()
346 .map(|result| crate::query::SearchHit {
347 address: crate::query::DocAddress::new(result.segment_id, result.doc_id),
348 score: result.score,
349 matched_fields: result.extract_ordinals(),
350 })
351 .collect();
352
353 Ok(crate::query::SearchResponse { hits, total_hits })
354 }
355
356 pub async fn get_document(
358 &self,
359 address: &crate::query::DocAddress,
360 ) -> Result<Option<crate::dsl::Document>> {
361 let reader = self.reader().await?;
362 let searcher = reader.searcher().await?;
363 searcher.get_document(address).await
364 }
365
366 pub async fn get_postings(
368 &self,
369 field: crate::Field,
370 term: &[u8],
371 ) -> Result<
372 Vec<(
373 Arc<crate::segment::SegmentReader>,
374 crate::structures::BlockPostingList,
375 )>,
376 > {
377 let segments = self.segment_readers().await?;
378 let mut results = Vec::new();
379
380 for segment in segments {
381 if let Some(postings) = segment.get_postings(field, term).await? {
382 results.push((segment, postings));
383 }
384 }
385
386 Ok(results)
387 }
388}
389
390#[cfg(feature = "native")]
392impl<D: crate::directories::DirectoryWriter + 'static> Index<D> {
393 pub fn writer(&self) -> writer::IndexWriter<D> {
395 writer::IndexWriter::from_index(self)
396 }
397}
398
399#[cfg(test)]
400mod tests;
401
402