1use std::path::Path;
9use std::sync::Arc;
10
11use parking_lot::RwLock;
12
13use rustc_hash::FxHashMap;
14
15use crate::DocId;
16use crate::directories::{Directory, SliceCachingDirectory};
17use crate::dsl::{Document, Field, Schema};
18use crate::error::{Error, Result};
19use crate::segment::{SegmentId, SegmentReader};
20use crate::structures::BlockPostingList;
21use crate::structures::{CoarseCentroids, PQCodebook};
22
23#[cfg(feature = "native")]
24mod vector_builder;
25#[cfg(feature = "native")]
26mod writer;
27#[cfg(feature = "native")]
28pub use writer::IndexWriter;
29
30mod metadata;
31pub use metadata::{FieldVectorMeta, INDEX_META_FILENAME, IndexMetadata, VectorIndexState};
32
33#[cfg(feature = "native")]
34mod helpers;
35#[cfg(feature = "native")]
36pub use helpers::{
37 IndexingStats, SchemaConfig, SchemaFieldConfig, create_index_at_path, create_index_from_sdl,
38 index_documents_from_reader, index_json_document, parse_schema,
39};
40
41pub const SLICE_CACHE_FILENAME: &str = "index.slicecache";
43
44#[derive(Debug, Clone)]
46pub struct IndexConfig {
47 pub num_threads: usize,
49 pub num_indexing_threads: usize,
51 pub num_compression_threads: usize,
53 pub term_cache_blocks: usize,
55 pub store_cache_blocks: usize,
57 pub max_docs_per_segment: u32,
59 pub merge_policy: Box<dyn crate::merge::MergePolicy>,
61 pub optimization: crate::structures::IndexOptimization,
63}
64
65impl Default for IndexConfig {
66 fn default() -> Self {
67 #[cfg(feature = "native")]
68 let cpus = num_cpus::get().max(1);
69 #[cfg(not(feature = "native"))]
70 let cpus = 1;
71
72 Self {
73 num_threads: cpus,
74 num_indexing_threads: 1,
75 num_compression_threads: cpus,
76 term_cache_blocks: 256,
77 store_cache_blocks: 32,
78 max_docs_per_segment: 100_000,
79 merge_policy: Box::new(crate::merge::TieredMergePolicy::default()),
80 optimization: crate::structures::IndexOptimization::default(),
81 }
82 }
83}
84
85pub struct Index<D: Directory> {
90 directory: Arc<D>,
91 schema: Arc<Schema>,
92 config: IndexConfig,
93 segments: RwLock<Vec<Arc<SegmentReader>>>,
94 default_fields: Vec<crate::Field>,
95 tokenizers: Arc<crate::tokenizer::TokenizerRegistry>,
96 global_stats: crate::query::GlobalStatsCache,
98 trained_centroids: FxHashMap<u32, Arc<CoarseCentroids>>,
100 trained_codebooks: FxHashMap<u32, Arc<PQCodebook>>,
102 #[cfg(feature = "native")]
103 thread_pool: Arc<rayon::ThreadPool>,
104}
105
106impl<D: Directory> Index<D> {
107 pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
109 let directory = Arc::new(directory);
110
111 let schema_slice = directory.open_read(Path::new("schema.json")).await?;
113 let schema_bytes = schema_slice.read_bytes().await?;
114 let schema: Schema = serde_json::from_slice(schema_bytes.as_slice())
115 .map_err(|e| Error::Serialization(e.to_string()))?;
116 let schema = Arc::new(schema);
117
118 let (segments, trained_centroids, trained_codebooks) =
120 Self::load_segments_and_trained(&directory, &schema, &config).await?;
121
122 #[cfg(feature = "native")]
123 let thread_pool = {
124 let pool = rayon::ThreadPoolBuilder::new()
125 .num_threads(config.num_threads)
126 .build()
127 .map_err(|e| Error::Io(std::io::Error::other(e)))?;
128 Arc::new(pool)
129 };
130
131 let default_fields: Vec<crate::Field> = if !schema.default_fields().is_empty() {
133 schema.default_fields().to_vec()
134 } else {
135 schema
136 .fields()
137 .filter(|(_, entry)| {
138 entry.indexed && entry.field_type == crate::dsl::FieldType::Text
139 })
140 .map(|(field, _)| field)
141 .collect()
142 };
143
144 Ok(Self {
145 directory,
146 schema,
147 config,
148 segments: RwLock::new(segments),
149 default_fields,
150 tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
151 global_stats: crate::query::GlobalStatsCache::new(),
152 trained_centroids,
153 trained_codebooks,
154 #[cfg(feature = "native")]
155 thread_pool,
156 })
157 }
158
159 async fn load_segments_and_trained(
161 directory: &Arc<D>,
162 schema: &Arc<Schema>,
163 config: &IndexConfig,
164 ) -> Result<(
165 Vec<Arc<SegmentReader>>,
166 FxHashMap<u32, Arc<CoarseCentroids>>,
167 FxHashMap<u32, Arc<PQCodebook>>,
168 )> {
169 let meta = Self::load_metadata(directory).await?;
171
172 let (trained_centroids, trained_codebooks) =
174 meta.load_trained_structures(directory.as_ref()).await;
175
176 let mut segments = Vec::new();
178 let mut doc_id_offset = 0u32;
179
180 for id_str in meta.segments {
181 let segment_id = SegmentId::from_hex(&id_str)
182 .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
183 let reader = SegmentReader::open(
184 directory.as_ref(),
185 segment_id,
186 Arc::clone(schema),
187 doc_id_offset,
188 config.term_cache_blocks,
189 )
190 .await?;
191
192 doc_id_offset += reader.meta().num_docs;
193 segments.push(Arc::new(reader));
194 }
195
196 Ok((segments, trained_centroids, trained_codebooks))
197 }
198
199 async fn load_metadata(directory: &Arc<D>) -> Result<IndexMetadata> {
201 let meta_path = Path::new(INDEX_META_FILENAME);
202 if directory.exists(meta_path).await.unwrap_or(false) {
203 let slice = directory.open_read(meta_path).await?;
204 let bytes = slice.read_bytes().await?;
205 let meta: IndexMetadata = serde_json::from_slice(bytes.as_slice())
206 .map_err(|e| Error::Serialization(e.to_string()))?;
207 return Ok(meta);
208 }
209 Ok(IndexMetadata::new())
210 }
211
212 pub fn schema(&self) -> &Schema {
214 &self.schema
215 }
216
217 pub fn directory(&self) -> &D {
219 &self.directory
220 }
221
222 pub fn num_docs(&self) -> u32 {
224 self.segments.read().iter().map(|s| s.num_docs()).sum()
225 }
226
227 pub async fn doc(&self, doc_id: DocId) -> Result<Option<Document>> {
229 let segments = self.segments.read().clone();
230
231 let mut offset = 0u32;
232 for segment in segments.iter() {
233 let segment_docs = segment.meta().num_docs;
234 if doc_id < offset + segment_docs {
235 let local_doc_id = doc_id - offset;
236 return segment.doc(local_doc_id).await;
237 }
238 offset += segment_docs;
239 }
240
241 Ok(None)
242 }
243
244 pub async fn get_postings(
246 &self,
247 field: Field,
248 term: &[u8],
249 ) -> Result<Vec<(Arc<SegmentReader>, BlockPostingList)>> {
250 let segments = self.segments.read().clone();
251 let mut results = Vec::new();
252
253 for segment in segments.iter() {
254 if let Some(postings) = segment.get_postings(field, term).await? {
255 results.push((Arc::clone(segment), postings));
256 }
257 }
258
259 Ok(results)
260 }
261
262 #[cfg(feature = "native")]
264 pub async fn spawn_blocking<F, R>(&self, f: F) -> R
265 where
266 F: FnOnce() -> R + Send + 'static,
267 R: Send + 'static,
268 {
269 let (tx, rx) = tokio::sync::oneshot::channel();
270 self.thread_pool.spawn(move || {
271 let result = f();
272 let _ = tx.send(result);
273 });
274 rx.await.expect("Thread pool task panicked")
275 }
276
277 pub fn segment_readers(&self) -> Vec<Arc<SegmentReader>> {
279 self.segments.read().clone()
280 }
281
282 pub async fn reload(&self) -> Result<()> {
289 let meta = Self::load_metadata(&self.directory).await?;
291
292 let mut new_segments = Vec::new();
294 let mut doc_id_offset = 0u32;
295
296 for id_str in meta.segments {
297 let segment_id = match SegmentId::from_hex(&id_str) {
298 Some(id) => id,
299 None => {
300 log::warn!("Invalid segment ID in metadata: {}", id_str);
301 continue;
302 }
303 };
304
305 match SegmentReader::open(
307 self.directory.as_ref(),
308 segment_id,
309 Arc::clone(&self.schema),
310 doc_id_offset,
311 self.config.term_cache_blocks,
312 )
313 .await
314 {
315 Ok(reader) => {
316 doc_id_offset += reader.meta().num_docs;
317 new_segments.push(Arc::new(reader));
318 }
319 Err(e) => {
320 log::warn!(
322 "Could not open segment {}: {:?} (may have been merged)",
323 id_str,
324 e
325 );
326 }
327 }
328 }
329
330 *self.segments.write() = new_segments;
331 self.global_stats.invalidate();
333 Ok(())
334 }
335
336 pub async fn needs_reload(&self) -> Result<bool> {
338 let meta = Self::load_metadata(&self.directory).await?;
339 let current_segments = self.segments.read();
340
341 if meta.segments.len() != current_segments.len() {
343 return Ok(true);
344 }
345
346 for (meta_id, reader) in meta.segments.iter().zip(current_segments.iter()) {
347 let reader_id = SegmentId::from_u128(reader.meta().id).to_hex();
348 if meta_id != &reader_id {
349 return Ok(true);
350 }
351 }
352
353 Ok(false)
354 }
355
356 pub fn global_stats(&self) -> Option<Arc<crate::query::GlobalStats>> {
366 self.global_stats.get()
367 }
368
369 pub async fn build_global_stats(&self) -> Result<Arc<crate::query::GlobalStats>> {
376 if let Some(stats) = self.global_stats.get() {
378 return Ok(stats);
379 }
380
381 let segments = self.segments.read().clone();
382 let schema = &self.schema;
383 let mut builder = crate::query::GlobalStatsBuilder::new();
384
385 let mut field_len_sums: rustc_hash::FxHashMap<u32, (u64, u64)> =
387 rustc_hash::FxHashMap::default();
388
389 for segment in &segments {
390 let num_docs = segment.num_docs() as u64;
391 builder.total_docs += num_docs;
392
393 for (&field_id, sparse_index) in segment.sparse_indexes() {
395 for (dim_id, posting_list) in sparse_index.postings.iter().enumerate() {
396 if let Some(pl) = posting_list {
397 builder.add_sparse_df(
398 crate::dsl::Field(field_id),
399 dim_id as u32,
400 pl.doc_count() as u64,
401 );
402 }
403 }
404 }
405
406 for (field, entry) in schema.fields() {
408 if entry.indexed && entry.field_type == crate::dsl::FieldType::Text {
409 let avg_len = segment.avg_field_len(field);
410 let (sum, count) = field_len_sums.entry(field.0).or_insert((0, 0));
411 *sum += (avg_len * num_docs as f32) as u64;
412 *count += num_docs;
413 }
414 }
415
416 for (field, term, doc_freq) in segment.all_terms_with_stats().await? {
418 builder.add_text_df(field, term, doc_freq as u64);
419 }
420 }
421
422 for (field_id, (sum, count)) in field_len_sums {
424 if count > 0 {
425 let global_avg = sum as f32 / count as f32;
426 builder.set_avg_field_len(crate::dsl::Field(field_id), global_avg);
427 }
428 }
429
430 let generation = self.global_stats.generation();
431 let stats = builder.build(generation);
432 self.global_stats.set_stats(stats);
433
434 Ok(self.global_stats.get().unwrap())
435 }
436
437 pub async fn search(
441 &self,
442 query: &dyn crate::query::Query,
443 limit: usize,
444 ) -> Result<crate::query::SearchResponse> {
445 self.search_offset(query, limit, 0).await
446 }
447
448 pub async fn search_offset(
450 &self,
451 query: &dyn crate::query::Query,
452 limit: usize,
453 offset: usize,
454 ) -> Result<crate::query::SearchResponse> {
455 self.search_internal(query, limit, offset, false).await
456 }
457
458 pub async fn search_with_matched_fields(
462 &self,
463 query: &dyn crate::query::Query,
464 limit: usize,
465 ) -> Result<crate::query::SearchResponse> {
466 self.search_internal(query, limit, 0, true).await
467 }
468
469 async fn search_internal(
470 &self,
471 query: &dyn crate::query::Query,
472 limit: usize,
473 offset: usize,
474 collect_positions: bool,
475 ) -> Result<crate::query::SearchResponse> {
476 let segments = self.segments.read().clone();
477 let mut all_results: Vec<(u128, crate::query::SearchResult)> = Vec::new();
478
479 let fetch_limit = offset + limit;
481 for segment in &segments {
482 let segment_id = segment.meta().id;
483 let results = if collect_positions {
484 crate::query::search_segment_with_positions(segment.as_ref(), query, fetch_limit)
485 .await?
486 } else {
487 crate::query::search_segment(segment.as_ref(), query, fetch_limit).await?
488 };
489 for result in results {
490 all_results.push((segment_id, result));
491 }
492 }
493
494 all_results.sort_by(|a, b| {
496 b.1.score
497 .partial_cmp(&a.1.score)
498 .unwrap_or(std::cmp::Ordering::Equal)
499 });
500
501 let total_hits = all_results.len() as u32;
503
504 let hits: Vec<crate::query::SearchHit> = all_results
506 .into_iter()
507 .skip(offset)
508 .take(limit)
509 .map(|(segment_id, result)| crate::query::SearchHit {
510 address: crate::query::DocAddress::new(segment_id, result.doc_id),
511 score: result.score,
512 matched_fields: result.extract_ordinals(),
513 })
514 .collect();
515
516 Ok(crate::query::SearchResponse { hits, total_hits })
517 }
518
519 pub async fn get_document(
521 &self,
522 address: &crate::query::DocAddress,
523 ) -> Result<Option<Document>> {
524 let segment_id = address
525 .segment_id_u128()
526 .ok_or_else(|| Error::Query(format!("Invalid segment ID: {}", address.segment_id)))?;
527
528 let segments = self.segments.read().clone();
529 for segment in &segments {
530 if segment.meta().id == segment_id {
531 return segment.doc(address.doc_id).await;
532 }
533 }
534
535 Ok(None)
536 }
537
538 pub fn default_fields(&self) -> &[crate::Field] {
540 &self.default_fields
541 }
542
543 pub fn set_default_fields(&mut self, fields: Vec<crate::Field>) {
545 self.default_fields = fields;
546 }
547
548 pub fn tokenizers(&self) -> &Arc<crate::tokenizer::TokenizerRegistry> {
550 &self.tokenizers
551 }
552
553 pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
558 let query_routers = self.schema.query_routers();
560 if !query_routers.is_empty() {
561 if let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers) {
563 return crate::dsl::QueryLanguageParser::with_router(
564 Arc::clone(&self.schema),
565 self.default_fields.clone(),
566 Arc::clone(&self.tokenizers),
567 router,
568 );
569 }
570 }
571
572 crate::dsl::QueryLanguageParser::new(
574 Arc::clone(&self.schema),
575 self.default_fields.clone(),
576 Arc::clone(&self.tokenizers),
577 )
578 }
579
580 pub async fn query(
586 &self,
587 query_str: &str,
588 limit: usize,
589 ) -> Result<crate::query::SearchResponse> {
590 self.query_offset(query_str, limit, 0).await
591 }
592
593 pub async fn query_offset(
595 &self,
596 query_str: &str,
597 limit: usize,
598 offset: usize,
599 ) -> Result<crate::query::SearchResponse> {
600 let parser = self.query_parser();
601 let query = parser.parse(query_str).map_err(Error::Query)?;
602 self.search_offset(query.as_ref(), limit, offset).await
603 }
604}
605
606impl<D: Directory> Index<SliceCachingDirectory<D>> {
608 pub async fn open_with_cache(
613 directory: D,
614 config: IndexConfig,
615 cache_max_bytes: usize,
616 ) -> Result<Self> {
617 let caching_dir = SliceCachingDirectory::new(directory, cache_max_bytes);
618
619 let cache_path = Path::new(SLICE_CACHE_FILENAME);
621 if let Ok(true) = caching_dir.inner().exists(cache_path).await
622 && let Ok(slice) = caching_dir.inner().open_read(cache_path).await
623 && let Ok(bytes) = slice.read_bytes().await
624 {
625 let _ = caching_dir.deserialize(bytes.as_slice());
626 }
627
628 Self::open(caching_dir, config).await
629 }
630
631 #[cfg(feature = "native")]
636 pub async fn save_slice_cache(&self) -> Result<()>
637 where
638 D: crate::directories::DirectoryWriter,
639 {
640 let cache_data = self.directory.serialize();
641 let cache_path = Path::new(SLICE_CACHE_FILENAME);
642 self.directory
643 .inner()
644 .write(cache_path, &cache_data)
645 .await?;
646 Ok(())
647 }
648
649 pub fn slice_cache_stats(&self) -> crate::directories::SliceCacheStats {
651 self.directory.stats()
652 }
653}
654
655#[cfg(feature = "native")]
664pub async fn warmup_and_save_slice_cache<D: crate::directories::DirectoryWriter>(
665 directory: D,
666 config: IndexConfig,
667 cache_max_bytes: usize,
668) -> Result<()> {
669 let caching_dir = SliceCachingDirectory::new(directory, cache_max_bytes);
670 let index = Index::open(caching_dir, config).await?;
671
672 index.save_slice_cache().await?;
678
679 Ok(())
680}
681
682#[cfg(feature = "native")]
683impl<D: Directory> Clone for Index<D> {
684 fn clone(&self) -> Self {
685 Self {
686 directory: Arc::clone(&self.directory),
687 schema: Arc::clone(&self.schema),
688 config: self.config.clone(),
689 segments: RwLock::new(self.segments.read().clone()),
690 default_fields: self.default_fields.clone(),
691 tokenizers: Arc::clone(&self.tokenizers),
692 global_stats: crate::query::GlobalStatsCache::new(),
693 trained_centroids: self.trained_centroids.clone(),
694 trained_codebooks: self.trained_codebooks.clone(),
695 thread_pool: Arc::clone(&self.thread_pool),
696 }
697 }
698}
699
700#[cfg(test)]
701mod tests {
702 use super::*;
703 use crate::directories::RamDirectory;
704 use crate::dsl::SchemaBuilder;
705
706 #[tokio::test]
707 async fn test_index_create_and_search() {
708 let mut schema_builder = SchemaBuilder::default();
709 let title = schema_builder.add_text_field("title", true, true);
710 let body = schema_builder.add_text_field("body", true, true);
711 let schema = schema_builder.build();
712
713 let dir = RamDirectory::new();
714 let config = IndexConfig::default();
715
716 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
718 .await
719 .unwrap();
720
721 let mut doc1 = Document::new();
722 doc1.add_text(title, "Hello World");
723 doc1.add_text(body, "This is the first document");
724 writer.add_document(doc1).await.unwrap();
725
726 let mut doc2 = Document::new();
727 doc2.add_text(title, "Goodbye World");
728 doc2.add_text(body, "This is the second document");
729 writer.add_document(doc2).await.unwrap();
730
731 writer.commit().await.unwrap();
732
733 let index = Index::open(dir, config).await.unwrap();
735 assert_eq!(index.num_docs(), 2);
736
737 let postings = index.get_postings(title, b"world").await.unwrap();
739 assert_eq!(postings.len(), 1); assert_eq!(postings[0].1.doc_count(), 2); let doc = index.doc(0).await.unwrap().unwrap();
744 assert_eq!(doc.get_first(title).unwrap().as_text(), Some("Hello World"));
745 }
746
747 #[tokio::test]
748 async fn test_multiple_segments() {
749 let mut schema_builder = SchemaBuilder::default();
750 let title = schema_builder.add_text_field("title", true, true);
751 let schema = schema_builder.build();
752
753 let dir = RamDirectory::new();
754 let config = IndexConfig {
755 max_docs_per_segment: 5, ..Default::default()
757 };
758
759 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
760 .await
761 .unwrap();
762
763 for batch in 0..3 {
765 for i in 0..5 {
766 let mut doc = Document::new();
767 doc.add_text(title, format!("Document {} batch {}", i, batch));
768 writer.add_document(doc).await.unwrap();
769 }
770 writer.commit().await.unwrap();
771 }
772
773 let index = Index::open(dir, config).await.unwrap();
775 assert_eq!(index.num_docs(), 15);
776 assert_eq!(index.segment_readers().len(), 3);
777 }
778
779 #[tokio::test]
780 async fn test_segment_merge() {
781 let mut schema_builder = SchemaBuilder::default();
782 let title = schema_builder.add_text_field("title", true, true);
783 let schema = schema_builder.build();
784
785 let dir = RamDirectory::new();
786 let config = IndexConfig {
787 max_docs_per_segment: 3,
788 ..Default::default()
789 };
790
791 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
792 .await
793 .unwrap();
794
795 for i in 0..9 {
797 let mut doc = Document::new();
798 doc.add_text(title, format!("Document {}", i));
799 writer.add_document(doc).await.unwrap();
800 }
801 writer.commit().await.unwrap();
802
803 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
805 assert_eq!(index.segment_readers().len(), 3);
806
807 let writer = IndexWriter::open(dir.clone(), config.clone())
809 .await
810 .unwrap();
811 writer.force_merge().await.unwrap();
812
813 let index = Index::open(dir, config).await.unwrap();
815 assert_eq!(index.segment_readers().len(), 1);
816 assert_eq!(index.num_docs(), 9);
817
818 for i in 0..9 {
820 let doc = index.doc(i).await.unwrap().unwrap();
821 assert_eq!(
822 doc.get_first(title).unwrap().as_text(),
823 Some(format!("Document {}", i).as_str())
824 );
825 }
826 }
827
828 #[tokio::test]
829 async fn test_match_query() {
830 let mut schema_builder = SchemaBuilder::default();
831 let title = schema_builder.add_text_field("title", true, true);
832 let body = schema_builder.add_text_field("body", true, true);
833 let schema = schema_builder.build();
834
835 let dir = RamDirectory::new();
836 let config = IndexConfig::default();
837
838 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
839 .await
840 .unwrap();
841
842 let mut doc1 = Document::new();
843 doc1.add_text(title, "rust programming");
844 doc1.add_text(body, "Learn rust language");
845 writer.add_document(doc1).await.unwrap();
846
847 let mut doc2 = Document::new();
848 doc2.add_text(title, "python programming");
849 doc2.add_text(body, "Learn python language");
850 writer.add_document(doc2).await.unwrap();
851
852 writer.commit().await.unwrap();
853
854 let index = Index::open(dir, config).await.unwrap();
855
856 let results = index.query("rust", 10).await.unwrap();
858 assert_eq!(results.hits.len(), 1);
859
860 let results = index.query("rust programming", 10).await.unwrap();
862 assert!(!results.hits.is_empty());
863
864 let hit = &results.hits[0];
866 assert!(!hit.address.segment_id.is_empty(), "Should have segment_id");
867
868 let doc = index.get_document(&hit.address).await.unwrap().unwrap();
870 assert!(
871 !doc.field_values().is_empty(),
872 "Doc should have field values"
873 );
874
875 let doc = index.doc(0).await.unwrap().unwrap();
877 assert!(
878 !doc.field_values().is_empty(),
879 "Doc should have field values"
880 );
881 }
882
883 #[tokio::test]
884 async fn test_slice_cache_warmup_and_load() {
885 use crate::directories::SliceCachingDirectory;
886
887 let mut schema_builder = SchemaBuilder::default();
888 let title = schema_builder.add_text_field("title", true, true);
889 let body = schema_builder.add_text_field("body", true, true);
890 let schema = schema_builder.build();
891
892 let dir = RamDirectory::new();
893 let config = IndexConfig::default();
894
895 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
897 .await
898 .unwrap();
899
900 for i in 0..10 {
901 let mut doc = Document::new();
902 doc.add_text(title, format!("Document {} about rust", i));
903 doc.add_text(body, format!("This is body text number {}", i));
904 writer.add_document(doc).await.unwrap();
905 }
906 writer.commit().await.unwrap();
907
908 let caching_dir = SliceCachingDirectory::new(dir.clone(), 1024 * 1024);
910 let index = Index::open(caching_dir, config.clone()).await.unwrap();
911
912 let results = index.query("rust", 10).await.unwrap();
914 assert!(!results.hits.is_empty());
915
916 let stats = index.slice_cache_stats();
918 assert!(stats.total_bytes > 0, "Cache should have data after search");
919
920 index.save_slice_cache().await.unwrap();
922
923 assert!(dir.exists(Path::new(SLICE_CACHE_FILENAME)).await.unwrap());
925
926 let index2 = Index::open_with_cache(dir.clone(), config.clone(), 1024 * 1024)
928 .await
929 .unwrap();
930
931 let stats2 = index2.slice_cache_stats();
933 assert!(
934 stats2.total_bytes > 0,
935 "Cache should be prefilled from file"
936 );
937
938 let results2 = index2.query("rust", 10).await.unwrap();
940 assert_eq!(results.hits.len(), results2.hits.len());
941 }
942
943 #[tokio::test]
944 async fn test_multivalue_field_indexing_and_search() {
945 let mut schema_builder = SchemaBuilder::default();
946 let uris = schema_builder.add_text_field("uris", true, true);
947 let title = schema_builder.add_text_field("title", true, true);
948 let schema = schema_builder.build();
949
950 let dir = RamDirectory::new();
951 let config = IndexConfig::default();
952
953 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
955 .await
956 .unwrap();
957
958 let mut doc = Document::new();
959 doc.add_text(uris, "one");
960 doc.add_text(uris, "two");
961 doc.add_text(title, "Test Document");
962 writer.add_document(doc).await.unwrap();
963
964 let mut doc2 = Document::new();
966 doc2.add_text(uris, "three");
967 doc2.add_text(title, "Another Document");
968 writer.add_document(doc2).await.unwrap();
969
970 writer.commit().await.unwrap();
971
972 let index = Index::open(dir, config).await.unwrap();
974 assert_eq!(index.num_docs(), 2);
975
976 let doc = index.doc(0).await.unwrap().unwrap();
978 let all_uris: Vec<_> = doc.get_all(uris).collect();
979 assert_eq!(all_uris.len(), 2, "Should have 2 uris values");
980 assert_eq!(all_uris[0].as_text(), Some("one"));
981 assert_eq!(all_uris[1].as_text(), Some("two"));
982
983 let json = doc.to_json(index.schema());
985 let uris_json = json.get("uris").unwrap();
986 assert!(uris_json.is_array(), "Multi-value field should be an array");
987 let uris_arr = uris_json.as_array().unwrap();
988 assert_eq!(uris_arr.len(), 2);
989 assert_eq!(uris_arr[0].as_str(), Some("one"));
990 assert_eq!(uris_arr[1].as_str(), Some("two"));
991
992 let results = index.query("uris:one", 10).await.unwrap();
994 assert_eq!(results.hits.len(), 1, "Should find doc with 'one'");
995 assert_eq!(results.hits[0].address.doc_id, 0);
996
997 let results = index.query("uris:two", 10).await.unwrap();
998 assert_eq!(results.hits.len(), 1, "Should find doc with 'two'");
999 assert_eq!(results.hits[0].address.doc_id, 0);
1000
1001 let results = index.query("uris:three", 10).await.unwrap();
1002 assert_eq!(results.hits.len(), 1, "Should find doc with 'three'");
1003 assert_eq!(results.hits[0].address.doc_id, 1);
1004
1005 let results = index.query("uris:nonexistent", 10).await.unwrap();
1007 assert_eq!(results.hits.len(), 0, "Should not find non-existent value");
1008 }
1009
1010 #[tokio::test]
1017 async fn test_wand_optimization_for_or_queries() {
1018 use crate::query::{BooleanQuery, TermQuery};
1019
1020 let mut schema_builder = SchemaBuilder::default();
1021 let content = schema_builder.add_text_field("content", true, true);
1022 let schema = schema_builder.build();
1023
1024 let dir = RamDirectory::new();
1025 let config = IndexConfig::default();
1026
1027 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1029 .await
1030 .unwrap();
1031
1032 let mut doc = Document::new();
1034 doc.add_text(content, "rust programming language is fast");
1035 writer.add_document(doc).await.unwrap();
1036
1037 let mut doc = Document::new();
1039 doc.add_text(content, "rust is a systems language");
1040 writer.add_document(doc).await.unwrap();
1041
1042 let mut doc = Document::new();
1044 doc.add_text(content, "programming is fun");
1045 writer.add_document(doc).await.unwrap();
1046
1047 let mut doc = Document::new();
1049 doc.add_text(content, "python is easy to learn");
1050 writer.add_document(doc).await.unwrap();
1051
1052 let mut doc = Document::new();
1054 doc.add_text(content, "rust rust programming programming systems");
1055 writer.add_document(doc).await.unwrap();
1056
1057 writer.commit().await.unwrap();
1058
1059 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1061
1062 let or_query = BooleanQuery::new()
1064 .should(TermQuery::text(content, "rust"))
1065 .should(TermQuery::text(content, "programming"));
1066
1067 let results = index.search(&or_query, 10).await.unwrap();
1068
1069 assert_eq!(results.hits.len(), 4, "Should find exactly 4 documents");
1071
1072 let doc_ids: Vec<u32> = results.hits.iter().map(|h| h.address.doc_id).collect();
1073 assert!(doc_ids.contains(&0), "Should find doc 0");
1074 assert!(doc_ids.contains(&1), "Should find doc 1");
1075 assert!(doc_ids.contains(&2), "Should find doc 2");
1076 assert!(doc_ids.contains(&4), "Should find doc 4");
1077 assert!(
1078 !doc_ids.contains(&3),
1079 "Should NOT find doc 3 (only has 'python')"
1080 );
1081
1082 let single_query = BooleanQuery::new().should(TermQuery::text(content, "rust"));
1084
1085 let results = index.search(&single_query, 10).await.unwrap();
1086 assert_eq!(results.hits.len(), 3, "Should find 3 documents with 'rust'");
1087
1088 let must_query = BooleanQuery::new()
1090 .must(TermQuery::text(content, "rust"))
1091 .should(TermQuery::text(content, "programming"));
1092
1093 let results = index.search(&must_query, 10).await.unwrap();
1094 assert_eq!(results.hits.len(), 3, "Should find 3 documents with 'rust'");
1096
1097 let must_not_query = BooleanQuery::new()
1099 .should(TermQuery::text(content, "rust"))
1100 .should(TermQuery::text(content, "programming"))
1101 .must_not(TermQuery::text(content, "systems"));
1102
1103 let results = index.search(&must_not_query, 10).await.unwrap();
1104 let doc_ids: Vec<u32> = results.hits.iter().map(|h| h.address.doc_id).collect();
1106 assert!(
1107 !doc_ids.contains(&1),
1108 "Should NOT find doc 1 (has 'systems')"
1109 );
1110 assert!(
1111 !doc_ids.contains(&4),
1112 "Should NOT find doc 4 (has 'systems')"
1113 );
1114
1115 let or_query = BooleanQuery::new()
1117 .should(TermQuery::text(content, "rust"))
1118 .should(TermQuery::text(content, "programming"));
1119
1120 let results = index.search(&or_query, 2).await.unwrap();
1121 assert_eq!(results.hits.len(), 2, "Should return only top 2 results");
1122
1123 }
1126
1127 #[tokio::test]
1129 async fn test_wand_results_match_standard_boolean() {
1130 use crate::query::{BooleanQuery, TermQuery, WandOrQuery};
1131
1132 let mut schema_builder = SchemaBuilder::default();
1133 let content = schema_builder.add_text_field("content", true, true);
1134 let schema = schema_builder.build();
1135
1136 let dir = RamDirectory::new();
1137 let config = IndexConfig::default();
1138
1139 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1140 .await
1141 .unwrap();
1142
1143 for i in 0..10 {
1145 let mut doc = Document::new();
1146 let text = match i % 4 {
1147 0 => "apple banana cherry",
1148 1 => "apple orange",
1149 2 => "banana grape",
1150 _ => "cherry date",
1151 };
1152 doc.add_text(content, text);
1153 writer.add_document(doc).await.unwrap();
1154 }
1155
1156 writer.commit().await.unwrap();
1157 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1158
1159 let wand_query = WandOrQuery::new(content).term("apple").term("banana");
1161
1162 let bool_query = BooleanQuery::new()
1163 .should(TermQuery::text(content, "apple"))
1164 .should(TermQuery::text(content, "banana"));
1165
1166 let wand_results = index.search(&wand_query, 10).await.unwrap();
1167 let bool_results = index.search(&bool_query, 10).await.unwrap();
1168
1169 assert_eq!(
1171 wand_results.hits.len(),
1172 bool_results.hits.len(),
1173 "WAND and Boolean should find same number of docs"
1174 );
1175
1176 let wand_docs: std::collections::HashSet<u32> =
1177 wand_results.hits.iter().map(|h| h.address.doc_id).collect();
1178 let bool_docs: std::collections::HashSet<u32> =
1179 bool_results.hits.iter().map(|h| h.address.doc_id).collect();
1180
1181 assert_eq!(
1182 wand_docs, bool_docs,
1183 "WAND and Boolean should find same documents"
1184 );
1185 }
1186
1187 #[tokio::test]
1188 async fn test_vector_index_threshold_switch() {
1189 use crate::dsl::{DenseVectorConfig, VectorIndexType};
1190
1191 let mut schema_builder = SchemaBuilder::default();
1193 let title = schema_builder.add_text_field("title", true, true);
1194 let embedding = schema_builder.add_dense_vector_field_with_config(
1195 "embedding",
1196 true, true, DenseVectorConfig {
1199 dim: 8,
1200 index_type: VectorIndexType::IvfRaBitQ,
1201 store_raw: true,
1202 num_clusters: Some(4), nprobe: 2,
1204 mrl_dim: None,
1205 build_threshold: Some(50), },
1207 );
1208 let schema = schema_builder.build();
1209
1210 let dir = RamDirectory::new();
1211 let config = IndexConfig::default();
1212
1213 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1215 .await
1216 .unwrap();
1217
1218 for i in 0..30 {
1220 let mut doc = Document::new();
1221 doc.add_text(title, format!("Document {}", i));
1222 let vec: Vec<f32> = (0..8).map(|_| (i as f32) / 30.0).collect();
1224 doc.add_dense_vector(embedding, vec);
1225 writer.add_document(doc).await.unwrap();
1226 }
1227 writer.commit().await.unwrap();
1228
1229 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1231 assert!(
1232 index.trained_centroids.is_empty(),
1233 "Should not have trained centroids below threshold"
1234 );
1235
1236 let query_vec: Vec<f32> = vec![0.5; 8];
1238 let segments = index.segment_readers();
1239 assert!(!segments.is_empty());
1240
1241 let results = segments[0]
1242 .search_dense_vector(
1243 embedding,
1244 &query_vec,
1245 5,
1246 1,
1247 crate::query::MultiValueCombiner::Max,
1248 )
1249 .unwrap();
1250 assert!(!results.is_empty(), "Flat search should return results");
1251
1252 let writer = IndexWriter::open(dir.clone(), config.clone())
1254 .await
1255 .unwrap();
1256
1257 for i in 30..60 {
1259 let mut doc = Document::new();
1260 doc.add_text(title, format!("Document {}", i));
1261 let vec: Vec<f32> = (0..8).map(|_| (i as f32) / 60.0).collect();
1262 doc.add_dense_vector(embedding, vec);
1263 writer.add_document(doc).await.unwrap();
1264 }
1265 writer.commit().await.unwrap();
1267
1268 assert!(
1270 writer.is_vector_index_built(embedding).await,
1271 "Vector index should be built after crossing threshold"
1272 );
1273
1274 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
1276 assert!(
1277 index.trained_centroids.contains_key(&embedding.0),
1278 "Should have loaded trained centroids for embedding field"
1279 );
1280
1281 let segments = index.segment_readers();
1283 let results = segments[0]
1284 .search_dense_vector(
1285 embedding,
1286 &query_vec,
1287 5,
1288 1,
1289 crate::query::MultiValueCombiner::Max,
1290 )
1291 .unwrap();
1292 assert!(
1293 !results.is_empty(),
1294 "Search should return results after build"
1295 );
1296
1297 let writer = IndexWriter::open(dir.clone(), config.clone())
1299 .await
1300 .unwrap();
1301 writer.build_vector_index().await.unwrap(); assert!(writer.is_vector_index_built(embedding).await);
1305 }
1306}