1use std::path::Path;
9use std::sync::Arc;
10
11use parking_lot::RwLock;
12
13use rustc_hash::FxHashMap;
14
15use crate::DocId;
16use crate::directories::{Directory, SliceCachingDirectory};
17use crate::dsl::{Document, Field, Schema};
18use crate::error::{Error, Result};
19use crate::segment::{SegmentId, SegmentReader};
20use crate::structures::BlockPostingList;
21use crate::structures::{CoarseCentroids, PQCodebook};
22
23#[cfg(feature = "native")]
24mod writer;
25#[cfg(feature = "native")]
26pub use writer::IndexWriter;
27
28#[cfg(feature = "native")]
29mod metadata;
30#[cfg(feature = "native")]
31pub use metadata::{FieldVectorMeta, INDEX_META_FILENAME, IndexMetadata, VectorIndexState};
32
33#[cfg(feature = "native")]
34mod helpers;
35#[cfg(feature = "native")]
36pub use helpers::{
37 IndexingStats, SchemaConfig, SchemaFieldConfig, create_index_at_path, create_index_from_sdl,
38 index_documents_from_reader, index_json_document, parse_schema,
39};
40
41pub const SLICE_CACHE_FILENAME: &str = "index.slicecache";
43
44#[derive(Debug, Clone)]
46pub struct IndexConfig {
47 pub num_threads: usize,
49 pub num_indexing_threads: usize,
51 pub num_compression_threads: usize,
53 pub term_cache_blocks: usize,
55 pub store_cache_blocks: usize,
57 pub max_docs_per_segment: u32,
59 pub merge_policy: Box<dyn crate::merge::MergePolicy>,
61 pub optimization: crate::structures::IndexOptimization,
63}
64
65impl Default for IndexConfig {
66 fn default() -> Self {
67 #[cfg(feature = "native")]
68 let cpus = num_cpus::get().max(1);
69 #[cfg(not(feature = "native"))]
70 let cpus = 1;
71
72 Self {
73 num_threads: cpus,
74 num_indexing_threads: 1,
75 num_compression_threads: cpus,
76 term_cache_blocks: 256,
77 store_cache_blocks: 32,
78 max_docs_per_segment: 100_000,
79 merge_policy: Box::new(crate::merge::TieredMergePolicy::default()),
80 optimization: crate::structures::IndexOptimization::default(),
81 }
82 }
83}
84
85pub struct Index<D: Directory> {
90 directory: Arc<D>,
91 schema: Arc<Schema>,
92 config: IndexConfig,
93 segments: RwLock<Vec<Arc<SegmentReader>>>,
94 default_fields: Vec<crate::Field>,
95 tokenizers: Arc<crate::tokenizer::TokenizerRegistry>,
96 global_stats: crate::query::GlobalStatsCache,
98 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
100 trained_codebooks: FxHashMap<u32, Arc<PQCodebook>>,
102 #[cfg(feature = "native")]
103 thread_pool: Arc<rayon::ThreadPool>,
104}
105
106impl<D: Directory> Index<D> {
107 pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
109 let directory = Arc::new(directory);
110
111 let schema_slice = directory.open_read(Path::new("schema.json")).await?;
113 let schema_bytes = schema_slice.read_bytes().await?;
114 let schema: Schema = serde_json::from_slice(schema_bytes.as_slice())
115 .map_err(|e| Error::Serialization(e.to_string()))?;
116 let schema = Arc::new(schema);
117
118 let (segments, trained_centroids, trained_codebooks) =
120 Self::load_segments_and_trained(&directory, &schema, &config).await?;
121
122 #[cfg(feature = "native")]
123 let thread_pool = {
124 let pool = rayon::ThreadPoolBuilder::new()
125 .num_threads(config.num_threads)
126 .build()
127 .map_err(|e| Error::Io(std::io::Error::other(e)))?;
128 Arc::new(pool)
129 };
130
131 let default_fields: Vec<crate::Field> = if !schema.default_fields().is_empty() {
133 schema.default_fields().to_vec()
134 } else {
135 schema
136 .fields()
137 .filter(|(_, entry)| {
138 entry.indexed && entry.field_type == crate::dsl::FieldType::Text
139 })
140 .map(|(field, _)| field)
141 .collect()
142 };
143
144 Ok(Self {
145 directory,
146 schema,
147 config,
148 segments: RwLock::new(segments),
149 default_fields,
150 tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
151 global_stats: crate::query::GlobalStatsCache::new(),
152 trained_centroids,
153 trained_codebooks,
154 #[cfg(feature = "native")]
155 thread_pool,
156 })
157 }
158
159 async fn load_segments_and_trained(
161 directory: &Arc<D>,
162 schema: &Arc<Schema>,
163 config: &IndexConfig,
164 ) -> Result<(
165 Vec<Arc<SegmentReader>>,
166 FxHashMap<u32, Arc<CoarseCentroids>>,
167 FxHashMap<u32, Arc<PQCodebook>>,
168 )> {
169 let meta = Self::load_metadata(directory).await?;
171
172 let (trained_centroids, trained_codebooks) =
174 meta.load_trained_structures(directory.as_ref()).await;
175
176 let mut segments = Vec::new();
178 let mut doc_id_offset = 0u32;
179
180 for id_str in meta.segments {
181 let segment_id = SegmentId::from_hex(&id_str)
182 .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
183 let reader = SegmentReader::open(
184 directory.as_ref(),
185 segment_id,
186 Arc::clone(schema),
187 doc_id_offset,
188 config.term_cache_blocks,
189 )
190 .await?;
191
192 doc_id_offset += reader.meta().num_docs;
193 segments.push(Arc::new(reader));
194 }
195
196 Ok((segments, trained_centroids, trained_codebooks))
197 }
198
199 async fn load_metadata(directory: &Arc<D>) -> Result<IndexMetadata> {
201 let meta_path = Path::new(INDEX_META_FILENAME);
202 if directory.exists(meta_path).await.unwrap_or(false) {
203 let slice = directory.open_read(meta_path).await?;
204 let bytes = slice.read_bytes().await?;
205 let meta: IndexMetadata = serde_json::from_slice(bytes.as_slice())
206 .map_err(|e| Error::Serialization(e.to_string()))?;
207 return Ok(meta);
208 }
209 Ok(IndexMetadata::new())
210 }
211
212 pub fn schema(&self) -> &Schema {
214 &self.schema
215 }
216
217 pub fn directory(&self) -> &D {
219 &self.directory
220 }
221
222 pub fn num_docs(&self) -> u32 {
224 self.segments.read().iter().map(|s| s.num_docs()).sum()
225 }
226
227 pub async fn doc(&self, doc_id: DocId) -> Result<Option<Document>> {
229 let segments = self.segments.read().clone();
230
231 let mut offset = 0u32;
232 for segment in segments.iter() {
233 let segment_docs = segment.meta().num_docs;
234 if doc_id < offset + segment_docs {
235 let local_doc_id = doc_id - offset;
236 return segment.doc(local_doc_id).await;
237 }
238 offset += segment_docs;
239 }
240
241 Ok(None)
242 }
243
244 pub async fn get_postings(
246 &self,
247 field: Field,
248 term: &[u8],
249 ) -> Result<Vec<(Arc<SegmentReader>, BlockPostingList)>> {
250 let segments = self.segments.read().clone();
251 let mut results = Vec::new();
252
253 for segment in segments.iter() {
254 if let Some(postings) = segment.get_postings(field, term).await? {
255 results.push((Arc::clone(segment), postings));
256 }
257 }
258
259 Ok(results)
260 }
261
262 #[cfg(feature = "native")]
264 pub async fn spawn_blocking<F, R>(&self, f: F) -> R
265 where
266 F: FnOnce() -> R + Send + 'static,
267 R: Send + 'static,
268 {
269 let (tx, rx) = tokio::sync::oneshot::channel();
270 self.thread_pool.spawn(move || {
271 let result = f();
272 let _ = tx.send(result);
273 });
274 rx.await.expect("Thread pool task panicked")
275 }
276
277 pub fn segment_readers(&self) -> Vec<Arc<SegmentReader>> {
279 self.segments.read().clone()
280 }
281
282 pub async fn reload(&self) -> Result<()> {
285 let (new_segments, _, _) =
286 Self::load_segments_and_trained(&self.directory, &self.schema, &self.config).await?;
287 *self.segments.write() = new_segments;
288 self.global_stats.invalidate();
290 Ok(())
291 }
292
293 pub fn global_stats(&self) -> Option<Arc<crate::query::GlobalStats>> {
303 self.global_stats.get()
304 }
305
306 pub async fn build_global_stats(&self) -> Result<Arc<crate::query::GlobalStats>> {
313 if let Some(stats) = self.global_stats.get() {
315 return Ok(stats);
316 }
317
318 let segments = self.segments.read().clone();
319 let schema = &self.schema;
320 let mut builder = crate::query::GlobalStatsBuilder::new();
321
322 let mut field_len_sums: rustc_hash::FxHashMap<u32, (u64, u64)> =
324 rustc_hash::FxHashMap::default();
325
326 for segment in &segments {
327 let num_docs = segment.num_docs() as u64;
328 builder.total_docs += num_docs;
329
330 for (&field_id, sparse_index) in segment.sparse_indexes() {
332 for (dim_id, posting_list) in sparse_index.postings.iter().enumerate() {
333 if let Some(pl) = posting_list {
334 builder.add_sparse_df(
335 crate::dsl::Field(field_id),
336 dim_id as u32,
337 pl.doc_count() as u64,
338 );
339 }
340 }
341 }
342
343 for (field, entry) in schema.fields() {
345 if entry.indexed && entry.field_type == crate::dsl::FieldType::Text {
346 let avg_len = segment.avg_field_len(field);
347 let (sum, count) = field_len_sums.entry(field.0).or_insert((0, 0));
348 *sum += (avg_len * num_docs as f32) as u64;
349 *count += num_docs;
350 }
351 }
352
353 for (field, term, doc_freq) in segment.all_terms_with_stats().await? {
355 builder.add_text_df(field, term, doc_freq as u64);
356 }
357 }
358
359 for (field_id, (sum, count)) in field_len_sums {
361 if count > 0 {
362 let global_avg = sum as f32 / count as f32;
363 builder.set_avg_field_len(crate::dsl::Field(field_id), global_avg);
364 }
365 }
366
367 let generation = self.global_stats.generation();
368 let stats = builder.build(generation);
369 self.global_stats.set_stats(stats);
370
371 Ok(self.global_stats.get().unwrap())
372 }
373
374 pub async fn search(
378 &self,
379 query: &dyn crate::query::Query,
380 limit: usize,
381 ) -> Result<crate::query::SearchResponse> {
382 self.search_offset(query, limit, 0).await
383 }
384
385 pub async fn search_offset(
387 &self,
388 query: &dyn crate::query::Query,
389 limit: usize,
390 offset: usize,
391 ) -> Result<crate::query::SearchResponse> {
392 self.search_internal(query, limit, offset, false).await
393 }
394
395 pub async fn search_with_matched_fields(
399 &self,
400 query: &dyn crate::query::Query,
401 limit: usize,
402 ) -> Result<crate::query::SearchResponse> {
403 self.search_internal(query, limit, 0, true).await
404 }
405
406 async fn search_internal(
407 &self,
408 query: &dyn crate::query::Query,
409 limit: usize,
410 offset: usize,
411 collect_positions: bool,
412 ) -> Result<crate::query::SearchResponse> {
413 let segments = self.segments.read().clone();
414 let mut all_results: Vec<(u128, crate::query::SearchResult)> = Vec::new();
415
416 let fetch_limit = offset + limit;
418 for segment in &segments {
419 let segment_id = segment.meta().id;
420 let results = crate::query::search_segment_with_positions(
421 segment.as_ref(),
422 query,
423 fetch_limit,
424 collect_positions,
425 )
426 .await?;
427 for result in results {
428 all_results.push((segment_id, result));
429 }
430 }
431
432 all_results.sort_by(|a, b| {
434 b.1.score
435 .partial_cmp(&a.1.score)
436 .unwrap_or(std::cmp::Ordering::Equal)
437 });
438
439 let total_hits = all_results.len() as u32;
441
442 let hits: Vec<crate::query::SearchHit> = all_results
444 .into_iter()
445 .skip(offset)
446 .take(limit)
447 .map(|(segment_id, result)| crate::query::SearchHit {
448 address: crate::query::DocAddress::new(segment_id, result.doc_id),
449 score: result.score,
450 matched_fields: result.extract_ordinals(),
451 })
452 .collect();
453
454 Ok(crate::query::SearchResponse { hits, total_hits })
455 }
456
457 pub async fn get_document(
459 &self,
460 address: &crate::query::DocAddress,
461 ) -> Result<Option<Document>> {
462 let segment_id = address
463 .segment_id_u128()
464 .ok_or_else(|| Error::Query(format!("Invalid segment ID: {}", address.segment_id)))?;
465
466 let segments = self.segments.read().clone();
467 for segment in &segments {
468 if segment.meta().id == segment_id {
469 return segment.doc(address.doc_id).await;
470 }
471 }
472
473 Ok(None)
474 }
475
476 pub fn default_fields(&self) -> &[crate::Field] {
478 &self.default_fields
479 }
480
481 pub fn set_default_fields(&mut self, fields: Vec<crate::Field>) {
483 self.default_fields = fields;
484 }
485
486 pub fn tokenizers(&self) -> &Arc<crate::tokenizer::TokenizerRegistry> {
488 &self.tokenizers
489 }
490
491 pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
496 let query_routers = self.schema.query_routers();
498 if !query_routers.is_empty() {
499 if let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers) {
501 return crate::dsl::QueryLanguageParser::with_router(
502 Arc::clone(&self.schema),
503 self.default_fields.clone(),
504 Arc::clone(&self.tokenizers),
505 router,
506 );
507 }
508 }
509
510 crate::dsl::QueryLanguageParser::new(
512 Arc::clone(&self.schema),
513 self.default_fields.clone(),
514 Arc::clone(&self.tokenizers),
515 )
516 }
517
518 pub async fn query(
524 &self,
525 query_str: &str,
526 limit: usize,
527 ) -> Result<crate::query::SearchResponse> {
528 self.query_offset(query_str, limit, 0).await
529 }
530
531 pub async fn query_offset(
533 &self,
534 query_str: &str,
535 limit: usize,
536 offset: usize,
537 ) -> Result<crate::query::SearchResponse> {
538 let parser = self.query_parser();
539 let query = parser.parse(query_str).map_err(Error::Query)?;
540 self.search_offset(query.as_ref(), limit, offset).await
541 }
542}
543
544impl<D: Directory> Index<SliceCachingDirectory<D>> {
546 pub async fn open_with_cache(
551 directory: D,
552 config: IndexConfig,
553 cache_max_bytes: usize,
554 ) -> Result<Self> {
555 let caching_dir = SliceCachingDirectory::new(directory, cache_max_bytes);
556
557 let cache_path = Path::new(SLICE_CACHE_FILENAME);
559 if let Ok(true) = caching_dir.inner().exists(cache_path).await
560 && let Ok(slice) = caching_dir.inner().open_read(cache_path).await
561 && let Ok(bytes) = slice.read_bytes().await
562 {
563 let _ = caching_dir.deserialize(bytes.as_slice());
564 }
565
566 Self::open(caching_dir, config).await
567 }
568
569 #[cfg(feature = "native")]
574 pub async fn save_slice_cache(&self) -> Result<()>
575 where
576 D: crate::directories::DirectoryWriter,
577 {
578 let cache_data = self.directory.serialize();
579 let cache_path = Path::new(SLICE_CACHE_FILENAME);
580 self.directory
581 .inner()
582 .write(cache_path, &cache_data)
583 .await?;
584 Ok(())
585 }
586
587 pub fn slice_cache_stats(&self) -> crate::directories::SliceCacheStats {
589 self.directory.stats()
590 }
591}
592
593#[cfg(feature = "native")]
602pub async fn warmup_and_save_slice_cache<D: crate::directories::DirectoryWriter>(
603 directory: D,
604 config: IndexConfig,
605 cache_max_bytes: usize,
606) -> Result<()> {
607 let caching_dir = SliceCachingDirectory::new(directory, cache_max_bytes);
608 let index = Index::open(caching_dir, config).await?;
609
610 index.save_slice_cache().await?;
616
617 Ok(())
618}
619
620#[cfg(feature = "native")]
621impl<D: Directory> Clone for Index<D> {
622 fn clone(&self) -> Self {
623 Self {
624 directory: Arc::clone(&self.directory),
625 schema: Arc::clone(&self.schema),
626 config: self.config.clone(),
627 segments: RwLock::new(self.segments.read().clone()),
628 default_fields: self.default_fields.clone(),
629 tokenizers: Arc::clone(&self.tokenizers),
630 global_stats: crate::query::GlobalStatsCache::new(),
631 trained_centroids: self.trained_centroids.clone(),
632 trained_codebooks: self.trained_codebooks.clone(),
633 thread_pool: Arc::clone(&self.thread_pool),
634 }
635 }
636}
637
638#[cfg(test)]
639mod tests {
640 use super::*;
641 use crate::directories::RamDirectory;
642 use crate::dsl::SchemaBuilder;
643
644 #[tokio::test]
645 async fn test_index_create_and_search() {
646 let mut schema_builder = SchemaBuilder::default();
647 let title = schema_builder.add_text_field("title", true, true);
648 let body = schema_builder.add_text_field("body", true, true);
649 let schema = schema_builder.build();
650
651 let dir = RamDirectory::new();
652 let config = IndexConfig::default();
653
654 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
656 .await
657 .unwrap();
658
659 let mut doc1 = Document::new();
660 doc1.add_text(title, "Hello World");
661 doc1.add_text(body, "This is the first document");
662 writer.add_document(doc1).await.unwrap();
663
664 let mut doc2 = Document::new();
665 doc2.add_text(title, "Goodbye World");
666 doc2.add_text(body, "This is the second document");
667 writer.add_document(doc2).await.unwrap();
668
669 writer.commit().await.unwrap();
670
671 let index = Index::open(dir, config).await.unwrap();
673 assert_eq!(index.num_docs(), 2);
674
675 let postings = index.get_postings(title, b"world").await.unwrap();
677 assert_eq!(postings.len(), 1); assert_eq!(postings[0].1.doc_count(), 2); let doc = index.doc(0).await.unwrap().unwrap();
682 assert_eq!(doc.get_first(title).unwrap().as_text(), Some("Hello World"));
683 }
684
685 #[tokio::test]
686 async fn test_multiple_segments() {
687 let mut schema_builder = SchemaBuilder::default();
688 let title = schema_builder.add_text_field("title", true, true);
689 let schema = schema_builder.build();
690
691 let dir = RamDirectory::new();
692 let config = IndexConfig {
693 max_docs_per_segment: 5, ..Default::default()
695 };
696
697 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
698 .await
699 .unwrap();
700
701 for batch in 0..3 {
703 for i in 0..5 {
704 let mut doc = Document::new();
705 doc.add_text(title, format!("Document {} batch {}", i, batch));
706 writer.add_document(doc).await.unwrap();
707 }
708 writer.commit().await.unwrap();
709 }
710
711 let index = Index::open(dir, config).await.unwrap();
713 assert_eq!(index.num_docs(), 15);
714 assert_eq!(index.segment_readers().len(), 3);
715 }
716
717 #[tokio::test]
718 async fn test_segment_merge() {
719 let mut schema_builder = SchemaBuilder::default();
720 let title = schema_builder.add_text_field("title", true, true);
721 let schema = schema_builder.build();
722
723 let dir = RamDirectory::new();
724 let config = IndexConfig {
725 max_docs_per_segment: 3,
726 ..Default::default()
727 };
728
729 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
730 .await
731 .unwrap();
732
733 for i in 0..9 {
735 let mut doc = Document::new();
736 doc.add_text(title, format!("Document {}", i));
737 writer.add_document(doc).await.unwrap();
738 }
739 writer.commit().await.unwrap();
740
741 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
743 assert_eq!(index.segment_readers().len(), 3);
744
745 let writer = IndexWriter::open(dir.clone(), config.clone())
747 .await
748 .unwrap();
749 writer.force_merge().await.unwrap();
750
751 let index = Index::open(dir, config).await.unwrap();
753 assert_eq!(index.segment_readers().len(), 1);
754 assert_eq!(index.num_docs(), 9);
755
756 for i in 0..9 {
758 let doc = index.doc(i).await.unwrap().unwrap();
759 assert_eq!(
760 doc.get_first(title).unwrap().as_text(),
761 Some(format!("Document {}", i).as_str())
762 );
763 }
764 }
765
766 #[tokio::test]
767 async fn test_match_query() {
768 let mut schema_builder = SchemaBuilder::default();
769 let title = schema_builder.add_text_field("title", true, true);
770 let body = schema_builder.add_text_field("body", true, true);
771 let schema = schema_builder.build();
772
773 let dir = RamDirectory::new();
774 let config = IndexConfig::default();
775
776 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
777 .await
778 .unwrap();
779
780 let mut doc1 = Document::new();
781 doc1.add_text(title, "rust programming");
782 doc1.add_text(body, "Learn rust language");
783 writer.add_document(doc1).await.unwrap();
784
785 let mut doc2 = Document::new();
786 doc2.add_text(title, "python programming");
787 doc2.add_text(body, "Learn python language");
788 writer.add_document(doc2).await.unwrap();
789
790 writer.commit().await.unwrap();
791
792 let index = Index::open(dir, config).await.unwrap();
793
794 let results = index.query("rust", 10).await.unwrap();
796 assert_eq!(results.hits.len(), 1);
797
798 let results = index.query("rust programming", 10).await.unwrap();
800 assert!(!results.hits.is_empty());
801
802 let hit = &results.hits[0];
804 assert!(!hit.address.segment_id.is_empty(), "Should have segment_id");
805
806 let doc = index.get_document(&hit.address).await.unwrap().unwrap();
808 assert!(
809 !doc.field_values().is_empty(),
810 "Doc should have field values"
811 );
812
813 let doc = index.doc(0).await.unwrap().unwrap();
815 assert!(
816 !doc.field_values().is_empty(),
817 "Doc should have field values"
818 );
819 }
820
821 #[tokio::test]
822 async fn test_slice_cache_warmup_and_load() {
823 use crate::directories::SliceCachingDirectory;
824
825 let mut schema_builder = SchemaBuilder::default();
826 let title = schema_builder.add_text_field("title", true, true);
827 let body = schema_builder.add_text_field("body", true, true);
828 let schema = schema_builder.build();
829
830 let dir = RamDirectory::new();
831 let config = IndexConfig::default();
832
833 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
835 .await
836 .unwrap();
837
838 for i in 0..10 {
839 let mut doc = Document::new();
840 doc.add_text(title, format!("Document {} about rust", i));
841 doc.add_text(body, format!("This is body text number {}", i));
842 writer.add_document(doc).await.unwrap();
843 }
844 writer.commit().await.unwrap();
845
846 let caching_dir = SliceCachingDirectory::new(dir.clone(), 1024 * 1024);
848 let index = Index::open(caching_dir, config.clone()).await.unwrap();
849
850 let results = index.query("rust", 10).await.unwrap();
852 assert!(!results.hits.is_empty());
853
854 let stats = index.slice_cache_stats();
856 assert!(stats.total_bytes > 0, "Cache should have data after search");
857
858 index.save_slice_cache().await.unwrap();
860
861 assert!(dir.exists(Path::new(SLICE_CACHE_FILENAME)).await.unwrap());
863
864 let index2 = Index::open_with_cache(dir.clone(), config.clone(), 1024 * 1024)
866 .await
867 .unwrap();
868
869 let stats2 = index2.slice_cache_stats();
871 assert!(
872 stats2.total_bytes > 0,
873 "Cache should be prefilled from file"
874 );
875
876 let results2 = index2.query("rust", 10).await.unwrap();
878 assert_eq!(results.hits.len(), results2.hits.len());
879 }
880
881 #[tokio::test]
882 async fn test_multivalue_field_indexing_and_search() {
883 let mut schema_builder = SchemaBuilder::default();
884 let uris = schema_builder.add_text_field("uris", true, true);
885 let title = schema_builder.add_text_field("title", true, true);
886 let schema = schema_builder.build();
887
888 let dir = RamDirectory::new();
889 let config = IndexConfig::default();
890
891 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
893 .await
894 .unwrap();
895
896 let mut doc = Document::new();
897 doc.add_text(uris, "one");
898 doc.add_text(uris, "two");
899 doc.add_text(title, "Test Document");
900 writer.add_document(doc).await.unwrap();
901
902 let mut doc2 = Document::new();
904 doc2.add_text(uris, "three");
905 doc2.add_text(title, "Another Document");
906 writer.add_document(doc2).await.unwrap();
907
908 writer.commit().await.unwrap();
909
910 let index = Index::open(dir, config).await.unwrap();
912 assert_eq!(index.num_docs(), 2);
913
914 let doc = index.doc(0).await.unwrap().unwrap();
916 let all_uris: Vec<_> = doc.get_all(uris).collect();
917 assert_eq!(all_uris.len(), 2, "Should have 2 uris values");
918 assert_eq!(all_uris[0].as_text(), Some("one"));
919 assert_eq!(all_uris[1].as_text(), Some("two"));
920
921 let json = doc.to_json(index.schema());
923 let uris_json = json.get("uris").unwrap();
924 assert!(uris_json.is_array(), "Multi-value field should be an array");
925 let uris_arr = uris_json.as_array().unwrap();
926 assert_eq!(uris_arr.len(), 2);
927 assert_eq!(uris_arr[0].as_str(), Some("one"));
928 assert_eq!(uris_arr[1].as_str(), Some("two"));
929
930 let results = index.query("uris:one", 10).await.unwrap();
932 assert_eq!(results.hits.len(), 1, "Should find doc with 'one'");
933 assert_eq!(results.hits[0].address.doc_id, 0);
934
935 let results = index.query("uris:two", 10).await.unwrap();
936 assert_eq!(results.hits.len(), 1, "Should find doc with 'two'");
937 assert_eq!(results.hits[0].address.doc_id, 0);
938
939 let results = index.query("uris:three", 10).await.unwrap();
940 assert_eq!(results.hits.len(), 1, "Should find doc with 'three'");
941 assert_eq!(results.hits[0].address.doc_id, 1);
942
943 let results = index.query("uris:nonexistent", 10).await.unwrap();
945 assert_eq!(results.hits.len(), 0, "Should not find non-existent value");
946 }
947
948 #[tokio::test]
955 async fn test_wand_optimization_for_or_queries() {
956 use crate::query::{BooleanQuery, TermQuery};
957
958 let mut schema_builder = SchemaBuilder::default();
959 let content = schema_builder.add_text_field("content", true, true);
960 let schema = schema_builder.build();
961
962 let dir = RamDirectory::new();
963 let config = IndexConfig::default();
964
965 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
967 .await
968 .unwrap();
969
970 let mut doc = Document::new();
972 doc.add_text(content, "rust programming language is fast");
973 writer.add_document(doc).await.unwrap();
974
975 let mut doc = Document::new();
977 doc.add_text(content, "rust is a systems language");
978 writer.add_document(doc).await.unwrap();
979
980 let mut doc = Document::new();
982 doc.add_text(content, "programming is fun");
983 writer.add_document(doc).await.unwrap();
984
985 let mut doc = Document::new();
987 doc.add_text(content, "python is easy to learn");
988 writer.add_document(doc).await.unwrap();
989
990 let mut doc = Document::new();
992 doc.add_text(content, "rust rust programming programming systems");
993 writer.add_document(doc).await.unwrap();
994
995 writer.commit().await.unwrap();
996
997 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
999
1000 let or_query = BooleanQuery::new()
1002 .should(TermQuery::text(content, "rust"))
1003 .should(TermQuery::text(content, "programming"));
1004
1005 let results = index.search(&or_query, 10).await.unwrap();
1006
1007 assert_eq!(results.hits.len(), 4, "Should find exactly 4 documents");
1009
1010 let doc_ids: Vec<u32> = results.hits.iter().map(|h| h.address.doc_id).collect();
1011 assert!(doc_ids.contains(&0), "Should find doc 0");
1012 assert!(doc_ids.contains(&1), "Should find doc 1");
1013 assert!(doc_ids.contains(&2), "Should find doc 2");
1014 assert!(doc_ids.contains(&4), "Should find doc 4");
1015 assert!(
1016 !doc_ids.contains(&3),
1017 "Should NOT find doc 3 (only has 'python')"
1018 );
1019
1020 let single_query = BooleanQuery::new().should(TermQuery::text(content, "rust"));
1022
1023 let results = index.search(&single_query, 10).await.unwrap();
1024 assert_eq!(results.hits.len(), 3, "Should find 3 documents with 'rust'");
1025
1026 let must_query = BooleanQuery::new()
1028 .must(TermQuery::text(content, "rust"))
1029 .should(TermQuery::text(content, "programming"));
1030
1031 let results = index.search(&must_query, 10).await.unwrap();
1032 assert_eq!(results.hits.len(), 3, "Should find 3 documents with 'rust'");
1034
1035 let must_not_query = BooleanQuery::new()
1037 .should(TermQuery::text(content, "rust"))
1038 .should(TermQuery::text(content, "programming"))
1039 .must_not(TermQuery::text(content, "systems"));
1040
1041 let results = index.search(&must_not_query, 10).await.unwrap();
1042 let doc_ids: Vec<u32> = results.hits.iter().map(|h| h.address.doc_id).collect();
1044 assert!(
1045 !doc_ids.contains(&1),
1046 "Should NOT find doc 1 (has 'systems')"
1047 );
1048 assert!(
1049 !doc_ids.contains(&4),
1050 "Should NOT find doc 4 (has 'systems')"
1051 );
1052
1053 let or_query = BooleanQuery::new()
1055 .should(TermQuery::text(content, "rust"))
1056 .should(TermQuery::text(content, "programming"));
1057
1058 let results = index.search(&or_query, 2).await.unwrap();
1059 assert_eq!(results.hits.len(), 2, "Should return only top 2 results");
1060
1061 }
1064
1065 #[tokio::test]
1067 async fn test_wand_results_match_standard_boolean() {
1068 use crate::query::{BooleanQuery, TermQuery, WandOrQuery};
1069
1070 let mut schema_builder = SchemaBuilder::default();
1071 let content = schema_builder.add_text_field("content", true, true);
1072 let schema = schema_builder.build();
1073
1074 let dir = RamDirectory::new();
1075 let config = IndexConfig::default();
1076
1077 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1078 .await
1079 .unwrap();
1080
1081 for i in 0..10 {
1083 let mut doc = Document::new();
1084 let text = match i % 4 {
1085 0 => "apple banana cherry",
1086 1 => "apple orange",
1087 2 => "banana grape",
1088 _ => "cherry date",
1089 };
1090 doc.add_text(content, text);
1091 writer.add_document(doc).await.unwrap();
1092 }
1093
1094 writer.commit().await.unwrap();
1095 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1096
1097 let wand_query = WandOrQuery::new(content).term("apple").term("banana");
1099
1100 let bool_query = BooleanQuery::new()
1101 .should(TermQuery::text(content, "apple"))
1102 .should(TermQuery::text(content, "banana"));
1103
1104 let wand_results = index.search(&wand_query, 10).await.unwrap();
1105 let bool_results = index.search(&bool_query, 10).await.unwrap();
1106
1107 assert_eq!(
1109 wand_results.hits.len(),
1110 bool_results.hits.len(),
1111 "WAND and Boolean should find same number of docs"
1112 );
1113
1114 let wand_docs: std::collections::HashSet<u32> =
1115 wand_results.hits.iter().map(|h| h.address.doc_id).collect();
1116 let bool_docs: std::collections::HashSet<u32> =
1117 bool_results.hits.iter().map(|h| h.address.doc_id).collect();
1118
1119 assert_eq!(
1120 wand_docs, bool_docs,
1121 "WAND and Boolean should find same documents"
1122 );
1123 }
1124
1125 #[tokio::test]
1126 async fn test_vector_index_threshold_switch() {
1127 use crate::dsl::{DenseVectorConfig, VectorIndexType};
1128
1129 let mut schema_builder = SchemaBuilder::default();
1131 let title = schema_builder.add_text_field("title", true, true);
1132 let embedding = schema_builder.add_dense_vector_field_with_config(
1133 "embedding",
1134 true, true, DenseVectorConfig {
1137 dim: 8,
1138 index_type: VectorIndexType::IvfRaBitQ,
1139 store_raw: true,
1140 num_clusters: Some(4), nprobe: 2,
1142 mrl_dim: None,
1143 build_threshold: Some(50), },
1145 );
1146 let schema = schema_builder.build();
1147
1148 let dir = RamDirectory::new();
1149 let config = IndexConfig::default();
1150
1151 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1153 .await
1154 .unwrap();
1155
1156 for i in 0..30 {
1158 let mut doc = Document::new();
1159 doc.add_text(title, format!("Document {}", i));
1160 let vec: Vec<f32> = (0..8).map(|_| (i as f32) / 30.0).collect();
1162 doc.add_dense_vector(embedding, vec);
1163 writer.add_document(doc).await.unwrap();
1164 }
1165 writer.commit().await.unwrap();
1166
1167 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1169 assert!(
1170 index.trained_centroids.is_empty(),
1171 "Should not have trained centroids below threshold"
1172 );
1173
1174 let query_vec: Vec<f32> = vec![0.5; 8];
1176 let segments = index.segment_readers();
1177 assert!(!segments.is_empty());
1178
1179 let results = segments[0]
1180 .search_dense_vector(
1181 embedding,
1182 &query_vec,
1183 5,
1184 1,
1185 crate::query::MultiValueCombiner::Max,
1186 )
1187 .unwrap();
1188 assert!(!results.is_empty(), "Flat search should return results");
1189
1190 let writer = IndexWriter::open(dir.clone(), config.clone())
1192 .await
1193 .unwrap();
1194
1195 for i in 30..60 {
1197 let mut doc = Document::new();
1198 doc.add_text(title, format!("Document {}", i));
1199 let vec: Vec<f32> = (0..8).map(|_| (i as f32) / 60.0).collect();
1200 doc.add_dense_vector(embedding, vec);
1201 writer.add_document(doc).await.unwrap();
1202 }
1203 writer.commit().await.unwrap();
1205
1206 assert!(
1208 writer.is_vector_index_built(embedding).await,
1209 "Vector index should be built after crossing threshold"
1210 );
1211
1212 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1214 assert!(
1215 index.trained_centroids.contains_key(&embedding.0),
1216 "Should have loaded trained centroids for embedding field"
1217 );
1218
1219 let segments = index.segment_readers();
1221 let results = segments[0]
1222 .search_dense_vector(
1223 embedding,
1224 &query_vec,
1225 5,
1226 1,
1227 crate::query::MultiValueCombiner::Max,
1228 )
1229 .unwrap();
1230 assert!(
1231 !results.is_empty(),
1232 "Search should return results after build"
1233 );
1234
1235 let writer = IndexWriter::open(dir.clone(), config.clone())
1237 .await
1238 .unwrap();
1239 writer.build_vector_index().await.unwrap(); assert!(writer.is_vector_index_built(embedding).await);
1243 }
1244}