1use std::path::Path;
9use std::sync::Arc;
10
11use parking_lot::RwLock;
12
13use rustc_hash::FxHashMap;
14
15use crate::DocId;
16use crate::directories::{Directory, SliceCachingDirectory};
17use crate::dsl::{Document, Field, Schema};
18use crate::error::{Error, Result};
19use crate::segment::{SegmentId, SegmentReader};
20use crate::structures::BlockPostingList;
21use crate::structures::{CoarseCentroids, PQCodebook};
22
23#[cfg(feature = "native")]
24mod vector_builder;
25#[cfg(feature = "native")]
26mod writer;
27#[cfg(feature = "native")]
28pub use writer::IndexWriter;
29
30mod metadata;
31pub use metadata::{FieldVectorMeta, INDEX_META_FILENAME, IndexMetadata, VectorIndexState};
32
33#[cfg(feature = "native")]
34mod helpers;
35#[cfg(feature = "native")]
36pub use helpers::{
37 IndexingStats, SchemaConfig, SchemaFieldConfig, create_index_at_path, create_index_from_sdl,
38 index_documents_from_reader, index_json_document, parse_schema,
39};
40
41pub const SLICE_CACHE_FILENAME: &str = "index.slicecache";
43
44#[derive(Debug, Clone)]
46pub struct IndexConfig {
47 pub num_threads: usize,
49 pub num_indexing_threads: usize,
51 pub num_compression_threads: usize,
53 pub term_cache_blocks: usize,
55 pub store_cache_blocks: usize,
57 pub max_indexing_memory_bytes: usize,
59 pub merge_policy: Box<dyn crate::merge::MergePolicy>,
61 pub optimization: crate::structures::IndexOptimization,
63}
64
65impl Default for IndexConfig {
66 fn default() -> Self {
67 #[cfg(feature = "native")]
68 let cpus = num_cpus::get().max(1);
69 #[cfg(not(feature = "native"))]
70 let cpus = 1;
71
72 Self {
73 num_threads: cpus,
74 num_indexing_threads: 1,
75 num_compression_threads: cpus,
76 term_cache_blocks: 256,
77 store_cache_blocks: 32,
78 max_indexing_memory_bytes: 2 * 1024 * 1024 * 1024, merge_policy: Box::new(crate::merge::TieredMergePolicy::default()),
80 optimization: crate::structures::IndexOptimization::default(),
81 }
82 }
83}
84
85pub struct Index<D: Directory> {
90 directory: Arc<D>,
91 schema: Arc<Schema>,
92 config: IndexConfig,
93 segments: RwLock<Vec<Arc<SegmentReader>>>,
94 default_fields: Vec<crate::Field>,
95 tokenizers: Arc<crate::tokenizer::TokenizerRegistry>,
96 global_stats: crate::query::GlobalStatsCache,
98 #[allow(dead_code)] trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
101 #[allow(dead_code)] trained_codebooks: FxHashMap<u32, Arc<PQCodebook>>,
104 #[cfg(feature = "native")]
105 thread_pool: Arc<rayon::ThreadPool>,
106}
107
108impl<D: Directory> Index<D> {
109 pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
111 let directory = Arc::new(directory);
112
113 let schema_slice = directory.open_read(Path::new("schema.json")).await?;
115 let schema_bytes = schema_slice.read_bytes().await?;
116 let schema: Schema = serde_json::from_slice(schema_bytes.as_slice())
117 .map_err(|e| Error::Serialization(e.to_string()))?;
118 let schema = Arc::new(schema);
119
120 let (segments, trained_centroids, trained_codebooks) =
122 Self::load_segments_and_trained(&directory, &schema, &config).await?;
123
124 #[cfg(feature = "native")]
125 let thread_pool = {
126 let pool = rayon::ThreadPoolBuilder::new()
127 .num_threads(config.num_threads)
128 .build()
129 .map_err(|e| Error::Io(std::io::Error::other(e)))?;
130 Arc::new(pool)
131 };
132
133 let default_fields: Vec<crate::Field> = if !schema.default_fields().is_empty() {
135 schema.default_fields().to_vec()
136 } else {
137 schema
138 .fields()
139 .filter(|(_, entry)| {
140 entry.indexed && entry.field_type == crate::dsl::FieldType::Text
141 })
142 .map(|(field, _)| field)
143 .collect()
144 };
145
146 Ok(Self {
147 directory,
148 schema,
149 config,
150 segments: RwLock::new(segments),
151 default_fields,
152 tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
153 global_stats: crate::query::GlobalStatsCache::new(),
154 trained_centroids,
155 trained_codebooks,
156 #[cfg(feature = "native")]
157 thread_pool,
158 })
159 }
160
161 async fn load_segments_and_trained(
163 directory: &Arc<D>,
164 schema: &Arc<Schema>,
165 config: &IndexConfig,
166 ) -> Result<(
167 Vec<Arc<SegmentReader>>,
168 FxHashMap<u32, Arc<CoarseCentroids>>,
169 FxHashMap<u32, Arc<PQCodebook>>,
170 )> {
171 let meta = Self::load_metadata(directory).await?;
173
174 let (trained_centroids, trained_codebooks) =
176 meta.load_trained_structures(directory.as_ref()).await;
177
178 let mut segments = Vec::new();
180 let mut doc_id_offset = 0u32;
181
182 for id_str in meta.segments {
183 let segment_id = SegmentId::from_hex(&id_str)
184 .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
185 let reader = SegmentReader::open(
186 directory.as_ref(),
187 segment_id,
188 Arc::clone(schema),
189 doc_id_offset,
190 config.term_cache_blocks,
191 )
192 .await?;
193
194 doc_id_offset += reader.meta().num_docs;
195 segments.push(Arc::new(reader));
196 }
197
198 Ok((segments, trained_centroids, trained_codebooks))
199 }
200
201 async fn load_metadata(directory: &Arc<D>) -> Result<IndexMetadata> {
203 let meta_path = Path::new(INDEX_META_FILENAME);
204 if directory.exists(meta_path).await.unwrap_or(false) {
205 let slice = directory.open_read(meta_path).await?;
206 let bytes = slice.read_bytes().await?;
207 let meta: IndexMetadata = serde_json::from_slice(bytes.as_slice())
208 .map_err(|e| Error::Serialization(e.to_string()))?;
209 return Ok(meta);
210 }
211 Ok(IndexMetadata::new())
212 }
213
214 pub fn schema(&self) -> &Schema {
216 &self.schema
217 }
218
219 pub fn directory(&self) -> &D {
221 &self.directory
222 }
223
224 pub fn num_docs(&self) -> u32 {
226 self.segments.read().iter().map(|s| s.num_docs()).sum()
227 }
228
229 pub async fn doc(&self, doc_id: DocId) -> Result<Option<Document>> {
231 let segments = self.segments.read().clone();
232
233 let mut offset = 0u32;
234 for segment in segments.iter() {
235 let segment_docs = segment.meta().num_docs;
236 if doc_id < offset + segment_docs {
237 let local_doc_id = doc_id - offset;
238 return segment.doc(local_doc_id).await;
239 }
240 offset += segment_docs;
241 }
242
243 Ok(None)
244 }
245
246 pub async fn get_postings(
248 &self,
249 field: Field,
250 term: &[u8],
251 ) -> Result<Vec<(Arc<SegmentReader>, BlockPostingList)>> {
252 let segments = self.segments.read().clone();
253 let mut results = Vec::new();
254
255 for segment in segments.iter() {
256 if let Some(postings) = segment.get_postings(field, term).await? {
257 results.push((Arc::clone(segment), postings));
258 }
259 }
260
261 Ok(results)
262 }
263
264 #[cfg(feature = "native")]
266 pub async fn spawn_blocking<F, R>(&self, f: F) -> R
267 where
268 F: FnOnce() -> R + Send + 'static,
269 R: Send + 'static,
270 {
271 let (tx, rx) = tokio::sync::oneshot::channel();
272 self.thread_pool.spawn(move || {
273 let result = f();
274 let _ = tx.send(result);
275 });
276 rx.await.expect("Thread pool task panicked")
277 }
278
279 pub fn segment_readers(&self) -> Vec<Arc<SegmentReader>> {
281 self.segments.read().clone()
282 }
283
284 pub async fn reload(&self) -> Result<()> {
291 let meta = Self::load_metadata(&self.directory).await?;
293
294 let mut new_segments = Vec::new();
296 let mut doc_id_offset = 0u32;
297
298 for id_str in meta.segments {
299 let segment_id = match SegmentId::from_hex(&id_str) {
300 Some(id) => id,
301 None => {
302 log::warn!("Invalid segment ID in metadata: {}", id_str);
303 continue;
304 }
305 };
306
307 match SegmentReader::open(
309 self.directory.as_ref(),
310 segment_id,
311 Arc::clone(&self.schema),
312 doc_id_offset,
313 self.config.term_cache_blocks,
314 )
315 .await
316 {
317 Ok(reader) => {
318 doc_id_offset += reader.meta().num_docs;
319 new_segments.push(Arc::new(reader));
320 }
321 Err(e) => {
322 log::warn!(
324 "Could not open segment {}: {:?} (may have been merged)",
325 id_str,
326 e
327 );
328 }
329 }
330 }
331
332 *self.segments.write() = new_segments;
333 self.global_stats.invalidate();
335 Ok(())
336 }
337
338 pub async fn needs_reload(&self) -> Result<bool> {
340 let meta = Self::load_metadata(&self.directory).await?;
341 let current_segments = self.segments.read();
342
343 if meta.segments.len() != current_segments.len() {
345 return Ok(true);
346 }
347
348 for (meta_id, reader) in meta.segments.iter().zip(current_segments.iter()) {
349 let reader_id = SegmentId::from_u128(reader.meta().id).to_hex();
350 if meta_id != &reader_id {
351 return Ok(true);
352 }
353 }
354
355 Ok(false)
356 }
357
358 pub fn global_stats(&self) -> Option<Arc<crate::query::GlobalStats>> {
368 self.global_stats.get()
369 }
370
371 pub async fn build_global_stats(&self) -> Result<Arc<crate::query::GlobalStats>> {
378 if let Some(stats) = self.global_stats.get() {
380 return Ok(stats);
381 }
382
383 let segments = self.segments.read().clone();
384 let schema = &self.schema;
385 let mut builder = crate::query::GlobalStatsBuilder::new();
386
387 let mut field_len_sums: rustc_hash::FxHashMap<u32, (u64, u64)> =
389 rustc_hash::FxHashMap::default();
390
391 for segment in &segments {
392 let num_docs = segment.num_docs() as u64;
393 builder.total_docs += num_docs;
394
395 for (&field_id, sparse_index) in segment.sparse_indexes() {
397 for (dim_id, posting_list) in sparse_index.postings.iter().enumerate() {
398 if let Some(pl) = posting_list {
399 builder.add_sparse_df(
400 crate::dsl::Field(field_id),
401 dim_id as u32,
402 pl.doc_count() as u64,
403 );
404 }
405 }
406 }
407
408 for (field, entry) in schema.fields() {
410 if entry.indexed && entry.field_type == crate::dsl::FieldType::Text {
411 let avg_len = segment.avg_field_len(field);
412 let (sum, count) = field_len_sums.entry(field.0).or_insert((0, 0));
413 *sum += (avg_len * num_docs as f32) as u64;
414 *count += num_docs;
415 }
416 }
417
418 for (field, term, doc_freq) in segment.all_terms_with_stats().await? {
420 builder.add_text_df(field, term, doc_freq as u64);
421 }
422 }
423
424 for (field_id, (sum, count)) in field_len_sums {
426 if count > 0 {
427 let global_avg = sum as f32 / count as f32;
428 builder.set_avg_field_len(crate::dsl::Field(field_id), global_avg);
429 }
430 }
431
432 let generation = self.global_stats.generation();
433 let stats = builder.build(generation);
434 self.global_stats.set_stats(stats);
435
436 Ok(self.global_stats.get().unwrap())
437 }
438
439 pub async fn search(
443 &self,
444 query: &dyn crate::query::Query,
445 limit: usize,
446 ) -> Result<crate::query::SearchResponse> {
447 self.search_offset(query, limit, 0).await
448 }
449
450 pub async fn search_offset(
452 &self,
453 query: &dyn crate::query::Query,
454 limit: usize,
455 offset: usize,
456 ) -> Result<crate::query::SearchResponse> {
457 self.search_internal(query, limit, offset, false).await
458 }
459
460 pub async fn search_with_matched_fields(
464 &self,
465 query: &dyn crate::query::Query,
466 limit: usize,
467 ) -> Result<crate::query::SearchResponse> {
468 self.search_internal(query, limit, 0, true).await
469 }
470
471 async fn search_internal(
472 &self,
473 query: &dyn crate::query::Query,
474 limit: usize,
475 offset: usize,
476 collect_positions: bool,
477 ) -> Result<crate::query::SearchResponse> {
478 let segments = self.segments.read().clone();
479 let mut all_results: Vec<(u128, crate::query::SearchResult)> = Vec::new();
480
481 let fetch_limit = offset + limit;
483 for segment in &segments {
484 let segment_id = segment.meta().id;
485 let results = if collect_positions {
486 crate::query::search_segment_with_positions(segment.as_ref(), query, fetch_limit)
487 .await?
488 } else {
489 crate::query::search_segment(segment.as_ref(), query, fetch_limit).await?
490 };
491 for result in results {
492 all_results.push((segment_id, result));
493 }
494 }
495
496 all_results.sort_by(|a, b| {
498 b.1.score
499 .partial_cmp(&a.1.score)
500 .unwrap_or(std::cmp::Ordering::Equal)
501 });
502
503 let total_hits = all_results.len() as u32;
505
506 let hits: Vec<crate::query::SearchHit> = all_results
508 .into_iter()
509 .skip(offset)
510 .take(limit)
511 .map(|(segment_id, result)| crate::query::SearchHit {
512 address: crate::query::DocAddress::new(segment_id, result.doc_id),
513 score: result.score,
514 matched_fields: result.extract_ordinals(),
515 })
516 .collect();
517
518 Ok(crate::query::SearchResponse { hits, total_hits })
519 }
520
521 pub async fn get_document(
523 &self,
524 address: &crate::query::DocAddress,
525 ) -> Result<Option<Document>> {
526 let segment_id = address
527 .segment_id_u128()
528 .ok_or_else(|| Error::Query(format!("Invalid segment ID: {}", address.segment_id)))?;
529
530 let segments = self.segments.read().clone();
531 for segment in &segments {
532 if segment.meta().id == segment_id {
533 return segment.doc(address.doc_id).await;
534 }
535 }
536
537 Ok(None)
538 }
539
540 pub fn default_fields(&self) -> &[crate::Field] {
542 &self.default_fields
543 }
544
545 pub fn set_default_fields(&mut self, fields: Vec<crate::Field>) {
547 self.default_fields = fields;
548 }
549
550 pub fn tokenizers(&self) -> &Arc<crate::tokenizer::TokenizerRegistry> {
552 &self.tokenizers
553 }
554
555 pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
560 let query_routers = self.schema.query_routers();
562 if !query_routers.is_empty() {
563 if let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers) {
565 return crate::dsl::QueryLanguageParser::with_router(
566 Arc::clone(&self.schema),
567 self.default_fields.clone(),
568 Arc::clone(&self.tokenizers),
569 router,
570 );
571 }
572 }
573
574 crate::dsl::QueryLanguageParser::new(
576 Arc::clone(&self.schema),
577 self.default_fields.clone(),
578 Arc::clone(&self.tokenizers),
579 )
580 }
581
582 pub async fn query(
588 &self,
589 query_str: &str,
590 limit: usize,
591 ) -> Result<crate::query::SearchResponse> {
592 self.query_offset(query_str, limit, 0).await
593 }
594
595 pub async fn query_offset(
597 &self,
598 query_str: &str,
599 limit: usize,
600 offset: usize,
601 ) -> Result<crate::query::SearchResponse> {
602 let parser = self.query_parser();
603 let query = parser.parse(query_str).map_err(Error::Query)?;
604 self.search_offset(query.as_ref(), limit, offset).await
605 }
606}
607
608impl<D: Directory> Index<SliceCachingDirectory<D>> {
610 pub async fn open_with_cache(
615 directory: D,
616 config: IndexConfig,
617 cache_max_bytes: usize,
618 ) -> Result<Self> {
619 let caching_dir = SliceCachingDirectory::new(directory, cache_max_bytes);
620
621 let cache_path = Path::new(SLICE_CACHE_FILENAME);
623 if let Ok(true) = caching_dir.inner().exists(cache_path).await
624 && let Ok(slice) = caching_dir.inner().open_read(cache_path).await
625 && let Ok(bytes) = slice.read_bytes().await
626 {
627 let _ = caching_dir.deserialize(bytes.as_slice());
628 }
629
630 Self::open(caching_dir, config).await
631 }
632
633 #[cfg(feature = "native")]
638 pub async fn save_slice_cache(&self) -> Result<()>
639 where
640 D: crate::directories::DirectoryWriter,
641 {
642 let cache_data = self.directory.serialize();
643 let cache_path = Path::new(SLICE_CACHE_FILENAME);
644 self.directory
645 .inner()
646 .write(cache_path, &cache_data)
647 .await?;
648 Ok(())
649 }
650
651 pub fn slice_cache_stats(&self) -> crate::directories::SliceCacheStats {
653 self.directory.stats()
654 }
655}
656
657#[cfg(feature = "native")]
666pub async fn warmup_and_save_slice_cache<D: crate::directories::DirectoryWriter>(
667 directory: D,
668 config: IndexConfig,
669 cache_max_bytes: usize,
670) -> Result<()> {
671 let caching_dir = SliceCachingDirectory::new(directory, cache_max_bytes);
672 let index = Index::open(caching_dir, config).await?;
673
674 index.save_slice_cache().await?;
680
681 Ok(())
682}
683
684#[cfg(feature = "native")]
685impl<D: Directory> Clone for Index<D> {
686 fn clone(&self) -> Self {
687 Self {
688 directory: Arc::clone(&self.directory),
689 schema: Arc::clone(&self.schema),
690 config: self.config.clone(),
691 segments: RwLock::new(self.segments.read().clone()),
692 default_fields: self.default_fields.clone(),
693 tokenizers: Arc::clone(&self.tokenizers),
694 global_stats: crate::query::GlobalStatsCache::new(),
695 trained_centroids: self.trained_centroids.clone(),
696 trained_codebooks: self.trained_codebooks.clone(),
697 thread_pool: Arc::clone(&self.thread_pool),
698 }
699 }
700}
701
702#[cfg(test)]
703mod tests {
704 use super::*;
705 use crate::directories::RamDirectory;
706 use crate::dsl::SchemaBuilder;
707
708 #[tokio::test]
709 async fn test_index_create_and_search() {
710 let mut schema_builder = SchemaBuilder::default();
711 let title = schema_builder.add_text_field("title", true, true);
712 let body = schema_builder.add_text_field("body", true, true);
713 let schema = schema_builder.build();
714
715 let dir = RamDirectory::new();
716 let config = IndexConfig::default();
717
718 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
720 .await
721 .unwrap();
722
723 let mut doc1 = Document::new();
724 doc1.add_text(title, "Hello World");
725 doc1.add_text(body, "This is the first document");
726 writer.add_document(doc1).unwrap();
727
728 let mut doc2 = Document::new();
729 doc2.add_text(title, "Goodbye World");
730 doc2.add_text(body, "This is the second document");
731 writer.add_document(doc2).unwrap();
732
733 writer.commit().await.unwrap();
734
735 let index = Index::open(dir, config).await.unwrap();
737 assert_eq!(index.num_docs(), 2);
738
739 let postings = index.get_postings(title, b"world").await.unwrap();
741 assert_eq!(postings.len(), 1); assert_eq!(postings[0].1.doc_count(), 2); let doc = index.doc(0).await.unwrap().unwrap();
746 assert_eq!(doc.get_first(title).unwrap().as_text(), Some("Hello World"));
747 }
748
749 #[tokio::test]
750 async fn test_multiple_segments() {
751 let mut schema_builder = SchemaBuilder::default();
752 let title = schema_builder.add_text_field("title", true, true);
753 let schema = schema_builder.build();
754
755 let dir = RamDirectory::new();
756 let config = IndexConfig {
757 max_indexing_memory_bytes: 1024, ..Default::default()
759 };
760
761 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
762 .await
763 .unwrap();
764
765 for batch in 0..3 {
767 for i in 0..5 {
768 let mut doc = Document::new();
769 doc.add_text(title, format!("Document {} batch {}", i, batch));
770 writer.add_document(doc).unwrap();
771 }
772 writer.commit().await.unwrap();
773 }
774
775 let index = Index::open(dir, config).await.unwrap();
777 assert_eq!(index.num_docs(), 15);
778 assert!(
780 index.segment_readers().len() >= 2,
781 "Expected multiple segments"
782 );
783 }
784
785 #[tokio::test]
786 async fn test_segment_merge() {
787 let mut schema_builder = SchemaBuilder::default();
788 let title = schema_builder.add_text_field("title", true, true);
789 let schema = schema_builder.build();
790
791 let dir = RamDirectory::new();
792 let config = IndexConfig {
793 max_indexing_memory_bytes: 512, ..Default::default()
795 };
796
797 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
798 .await
799 .unwrap();
800
801 for batch in 0..3 {
803 for i in 0..3 {
804 let mut doc = Document::new();
805 doc.add_text(title, format!("Document {} batch {}", i, batch));
806 writer.add_document(doc).unwrap();
807 }
808 writer.flush().await.unwrap();
809 }
810 writer.commit().await.unwrap();
811
812 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
814 assert!(
815 index.segment_readers().len() >= 2,
816 "Expected multiple segments"
817 );
818
819 let writer = IndexWriter::open(dir.clone(), config.clone())
821 .await
822 .unwrap();
823 writer.force_merge().await.unwrap();
824
825 let index = Index::open(dir, config).await.unwrap();
827 assert_eq!(index.segment_readers().len(), 1);
828 assert_eq!(index.num_docs(), 9);
829
830 let mut found_docs = 0;
832 for i in 0..9 {
833 if index.doc(i).await.unwrap().is_some() {
834 found_docs += 1;
835 }
836 }
837 assert_eq!(found_docs, 9);
838 }
839
840 #[tokio::test]
841 async fn test_match_query() {
842 let mut schema_builder = SchemaBuilder::default();
843 let title = schema_builder.add_text_field("title", true, true);
844 let body = schema_builder.add_text_field("body", true, true);
845 let schema = schema_builder.build();
846
847 let dir = RamDirectory::new();
848 let config = IndexConfig::default();
849
850 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
851 .await
852 .unwrap();
853
854 let mut doc1 = Document::new();
855 doc1.add_text(title, "rust programming");
856 doc1.add_text(body, "Learn rust language");
857 writer.add_document(doc1).unwrap();
858
859 let mut doc2 = Document::new();
860 doc2.add_text(title, "python programming");
861 doc2.add_text(body, "Learn python language");
862 writer.add_document(doc2).unwrap();
863
864 writer.commit().await.unwrap();
865
866 let index = Index::open(dir, config).await.unwrap();
867
868 let results = index.query("rust", 10).await.unwrap();
870 assert_eq!(results.hits.len(), 1);
871
872 let results = index.query("rust programming", 10).await.unwrap();
874 assert!(!results.hits.is_empty());
875
876 let hit = &results.hits[0];
878 assert!(!hit.address.segment_id.is_empty(), "Should have segment_id");
879
880 let doc = index.get_document(&hit.address).await.unwrap().unwrap();
882 assert!(
883 !doc.field_values().is_empty(),
884 "Doc should have field values"
885 );
886
887 let doc = index.doc(0).await.unwrap().unwrap();
889 assert!(
890 !doc.field_values().is_empty(),
891 "Doc should have field values"
892 );
893 }
894
895 #[tokio::test]
896 async fn test_slice_cache_warmup_and_load() {
897 use crate::directories::SliceCachingDirectory;
898
899 let mut schema_builder = SchemaBuilder::default();
900 let title = schema_builder.add_text_field("title", true, true);
901 let body = schema_builder.add_text_field("body", true, true);
902 let schema = schema_builder.build();
903
904 let dir = RamDirectory::new();
905 let config = IndexConfig::default();
906
907 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
909 .await
910 .unwrap();
911
912 for i in 0..10 {
913 let mut doc = Document::new();
914 doc.add_text(title, format!("Document {} about rust", i));
915 doc.add_text(body, format!("This is body text number {}", i));
916 writer.add_document(doc).unwrap();
917 }
918 writer.commit().await.unwrap();
919
920 let caching_dir = SliceCachingDirectory::new(dir.clone(), 1024 * 1024);
922 let index = Index::open(caching_dir, config.clone()).await.unwrap();
923
924 let results = index.query("rust", 10).await.unwrap();
926 assert!(!results.hits.is_empty());
927
928 let stats = index.slice_cache_stats();
930 assert!(stats.total_bytes > 0, "Cache should have data after search");
931
932 index.save_slice_cache().await.unwrap();
934
935 assert!(dir.exists(Path::new(SLICE_CACHE_FILENAME)).await.unwrap());
937
938 let index2 = Index::open_with_cache(dir.clone(), config.clone(), 1024 * 1024)
940 .await
941 .unwrap();
942
943 let stats2 = index2.slice_cache_stats();
945 assert!(
946 stats2.total_bytes > 0,
947 "Cache should be prefilled from file"
948 );
949
950 let results2 = index2.query("rust", 10).await.unwrap();
952 assert_eq!(results.hits.len(), results2.hits.len());
953 }
954
955 #[tokio::test]
956 async fn test_multivalue_field_indexing_and_search() {
957 let mut schema_builder = SchemaBuilder::default();
958 let uris = schema_builder.add_text_field("uris", true, true);
959 let title = schema_builder.add_text_field("title", true, true);
960 let schema = schema_builder.build();
961
962 let dir = RamDirectory::new();
963 let config = IndexConfig::default();
964
965 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
967 .await
968 .unwrap();
969
970 let mut doc = Document::new();
971 doc.add_text(uris, "one");
972 doc.add_text(uris, "two");
973 doc.add_text(title, "Test Document");
974 writer.add_document(doc).unwrap();
975
976 let mut doc2 = Document::new();
978 doc2.add_text(uris, "three");
979 doc2.add_text(title, "Another Document");
980 writer.add_document(doc2).unwrap();
981
982 writer.commit().await.unwrap();
983
984 let index = Index::open(dir, config).await.unwrap();
986 assert_eq!(index.num_docs(), 2);
987
988 let doc = index.doc(0).await.unwrap().unwrap();
990 let all_uris: Vec<_> = doc.get_all(uris).collect();
991 assert_eq!(all_uris.len(), 2, "Should have 2 uris values");
992 assert_eq!(all_uris[0].as_text(), Some("one"));
993 assert_eq!(all_uris[1].as_text(), Some("two"));
994
995 let json = doc.to_json(index.schema());
997 let uris_json = json.get("uris").unwrap();
998 assert!(uris_json.is_array(), "Multi-value field should be an array");
999 let uris_arr = uris_json.as_array().unwrap();
1000 assert_eq!(uris_arr.len(), 2);
1001 assert_eq!(uris_arr[0].as_str(), Some("one"));
1002 assert_eq!(uris_arr[1].as_str(), Some("two"));
1003
1004 let results = index.query("uris:one", 10).await.unwrap();
1006 assert_eq!(results.hits.len(), 1, "Should find doc with 'one'");
1007 assert_eq!(results.hits[0].address.doc_id, 0);
1008
1009 let results = index.query("uris:two", 10).await.unwrap();
1010 assert_eq!(results.hits.len(), 1, "Should find doc with 'two'");
1011 assert_eq!(results.hits[0].address.doc_id, 0);
1012
1013 let results = index.query("uris:three", 10).await.unwrap();
1014 assert_eq!(results.hits.len(), 1, "Should find doc with 'three'");
1015 assert_eq!(results.hits[0].address.doc_id, 1);
1016
1017 let results = index.query("uris:nonexistent", 10).await.unwrap();
1019 assert_eq!(results.hits.len(), 0, "Should not find non-existent value");
1020 }
1021
1022 #[tokio::test]
1029 async fn test_wand_optimization_for_or_queries() {
1030 use crate::query::{BooleanQuery, TermQuery};
1031
1032 let mut schema_builder = SchemaBuilder::default();
1033 let content = schema_builder.add_text_field("content", true, true);
1034 let schema = schema_builder.build();
1035
1036 let dir = RamDirectory::new();
1037 let config = IndexConfig::default();
1038
1039 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1041 .await
1042 .unwrap();
1043
1044 let mut doc = Document::new();
1046 doc.add_text(content, "rust programming language is fast");
1047 writer.add_document(doc).unwrap();
1048
1049 let mut doc = Document::new();
1051 doc.add_text(content, "rust is a systems language");
1052 writer.add_document(doc).unwrap();
1053
1054 let mut doc = Document::new();
1056 doc.add_text(content, "programming is fun");
1057 writer.add_document(doc).unwrap();
1058
1059 let mut doc = Document::new();
1061 doc.add_text(content, "python is easy to learn");
1062 writer.add_document(doc).unwrap();
1063
1064 let mut doc = Document::new();
1066 doc.add_text(content, "rust rust programming programming systems");
1067 writer.add_document(doc).unwrap();
1068
1069 writer.commit().await.unwrap();
1070
1071 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1073
1074 let or_query = BooleanQuery::new()
1076 .should(TermQuery::text(content, "rust"))
1077 .should(TermQuery::text(content, "programming"));
1078
1079 let results = index.search(&or_query, 10).await.unwrap();
1080
1081 assert_eq!(results.hits.len(), 4, "Should find exactly 4 documents");
1083
1084 let doc_ids: Vec<u32> = results.hits.iter().map(|h| h.address.doc_id).collect();
1085 assert!(doc_ids.contains(&0), "Should find doc 0");
1086 assert!(doc_ids.contains(&1), "Should find doc 1");
1087 assert!(doc_ids.contains(&2), "Should find doc 2");
1088 assert!(doc_ids.contains(&4), "Should find doc 4");
1089 assert!(
1090 !doc_ids.contains(&3),
1091 "Should NOT find doc 3 (only has 'python')"
1092 );
1093
1094 let single_query = BooleanQuery::new().should(TermQuery::text(content, "rust"));
1096
1097 let results = index.search(&single_query, 10).await.unwrap();
1098 assert_eq!(results.hits.len(), 3, "Should find 3 documents with 'rust'");
1099
1100 let must_query = BooleanQuery::new()
1102 .must(TermQuery::text(content, "rust"))
1103 .should(TermQuery::text(content, "programming"));
1104
1105 let results = index.search(&must_query, 10).await.unwrap();
1106 assert_eq!(results.hits.len(), 3, "Should find 3 documents with 'rust'");
1108
1109 let must_not_query = BooleanQuery::new()
1111 .should(TermQuery::text(content, "rust"))
1112 .should(TermQuery::text(content, "programming"))
1113 .must_not(TermQuery::text(content, "systems"));
1114
1115 let results = index.search(&must_not_query, 10).await.unwrap();
1116 let doc_ids: Vec<u32> = results.hits.iter().map(|h| h.address.doc_id).collect();
1118 assert!(
1119 !doc_ids.contains(&1),
1120 "Should NOT find doc 1 (has 'systems')"
1121 );
1122 assert!(
1123 !doc_ids.contains(&4),
1124 "Should NOT find doc 4 (has 'systems')"
1125 );
1126
1127 let or_query = BooleanQuery::new()
1129 .should(TermQuery::text(content, "rust"))
1130 .should(TermQuery::text(content, "programming"));
1131
1132 let results = index.search(&or_query, 2).await.unwrap();
1133 assert_eq!(results.hits.len(), 2, "Should return only top 2 results");
1134
1135 }
1138
1139 #[tokio::test]
1141 async fn test_wand_results_match_standard_boolean() {
1142 use crate::query::{BooleanQuery, TermQuery, WandOrQuery};
1143
1144 let mut schema_builder = SchemaBuilder::default();
1145 let content = schema_builder.add_text_field("content", true, true);
1146 let schema = schema_builder.build();
1147
1148 let dir = RamDirectory::new();
1149 let config = IndexConfig::default();
1150
1151 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1152 .await
1153 .unwrap();
1154
1155 for i in 0..10 {
1157 let mut doc = Document::new();
1158 let text = match i % 4 {
1159 0 => "apple banana cherry",
1160 1 => "apple orange",
1161 2 => "banana grape",
1162 _ => "cherry date",
1163 };
1164 doc.add_text(content, text);
1165 writer.add_document(doc).unwrap();
1166 }
1167
1168 writer.commit().await.unwrap();
1169 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1170
1171 let wand_query = WandOrQuery::new(content).term("apple").term("banana");
1173
1174 let bool_query = BooleanQuery::new()
1175 .should(TermQuery::text(content, "apple"))
1176 .should(TermQuery::text(content, "banana"));
1177
1178 let wand_results = index.search(&wand_query, 10).await.unwrap();
1179 let bool_results = index.search(&bool_query, 10).await.unwrap();
1180
1181 assert_eq!(
1183 wand_results.hits.len(),
1184 bool_results.hits.len(),
1185 "WAND and Boolean should find same number of docs"
1186 );
1187
1188 let wand_docs: std::collections::HashSet<u32> =
1189 wand_results.hits.iter().map(|h| h.address.doc_id).collect();
1190 let bool_docs: std::collections::HashSet<u32> =
1191 bool_results.hits.iter().map(|h| h.address.doc_id).collect();
1192
1193 assert_eq!(
1194 wand_docs, bool_docs,
1195 "WAND and Boolean should find same documents"
1196 );
1197 }
1198
1199 #[tokio::test]
1200 async fn test_vector_index_threshold_switch() {
1201 use crate::dsl::{DenseVectorConfig, VectorIndexType};
1202
1203 let mut schema_builder = SchemaBuilder::default();
1205 let title = schema_builder.add_text_field("title", true, true);
1206 let embedding = schema_builder.add_dense_vector_field_with_config(
1207 "embedding",
1208 true, true, DenseVectorConfig {
1211 dim: 8,
1212 index_type: VectorIndexType::IvfRaBitQ,
1213 store_raw: true,
1214 num_clusters: Some(4), nprobe: 2,
1216 mrl_dim: None,
1217 build_threshold: Some(50), },
1219 );
1220 let schema = schema_builder.build();
1221
1222 let dir = RamDirectory::new();
1223 let config = IndexConfig::default();
1224
1225 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1227 .await
1228 .unwrap();
1229
1230 for i in 0..30 {
1232 let mut doc = Document::new();
1233 doc.add_text(title, format!("Document {}", i));
1234 let vec: Vec<f32> = (0..8).map(|_| (i as f32) / 30.0).collect();
1236 doc.add_dense_vector(embedding, vec);
1237 writer.add_document(doc).unwrap();
1238 }
1239 writer.commit().await.unwrap();
1240
1241 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1243 assert!(
1244 index.trained_centroids.is_empty(),
1245 "Should not have trained centroids below threshold"
1246 );
1247
1248 let query_vec: Vec<f32> = vec![0.5; 8];
1250 let segments = index.segment_readers();
1251 assert!(!segments.is_empty());
1252
1253 let results = segments[0]
1254 .search_dense_vector(
1255 embedding,
1256 &query_vec,
1257 5,
1258 1,
1259 crate::query::MultiValueCombiner::Max,
1260 )
1261 .unwrap();
1262 assert!(!results.is_empty(), "Flat search should return results");
1263
1264 let writer = IndexWriter::open(dir.clone(), config.clone())
1266 .await
1267 .unwrap();
1268
1269 for i in 30..60 {
1271 let mut doc = Document::new();
1272 doc.add_text(title, format!("Document {}", i));
1273 let vec: Vec<f32> = (0..8).map(|_| (i as f32) / 60.0).collect();
1274 doc.add_dense_vector(embedding, vec);
1275 writer.add_document(doc).unwrap();
1276 }
1277 writer.commit().await.unwrap();
1279
1280 assert!(
1282 writer.is_vector_index_built(embedding).await,
1283 "Vector index should be built after crossing threshold"
1284 );
1285
1286 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1288 assert!(
1289 index.trained_centroids.contains_key(&embedding.0),
1290 "Should have loaded trained centroids for embedding field"
1291 );
1292
1293 let segments = index.segment_readers();
1295 let results = segments[0]
1296 .search_dense_vector(
1297 embedding,
1298 &query_vec,
1299 5,
1300 1,
1301 crate::query::MultiValueCombiner::Max,
1302 )
1303 .unwrap();
1304 assert!(
1305 !results.is_empty(),
1306 "Search should return results after build"
1307 );
1308
1309 let writer = IndexWriter::open(dir.clone(), config.clone())
1311 .await
1312 .unwrap();
1313 writer.build_vector_index().await.unwrap(); assert!(writer.is_vector_index_built(embedding).await);
1317 }
1318}