1use std::path::Path;
9use std::sync::Arc;
10
11use parking_lot::RwLock;
12#[cfg(feature = "native")]
13use rustc_hash::FxHashMap;
14
15use crate::DocId;
16#[cfg(feature = "native")]
17use crate::directories::DirectoryWriter;
18use crate::directories::{Directory, SliceCachingDirectory};
19use crate::dsl::{Document, Field, Schema};
20use crate::error::{Error, Result};
21#[cfg(feature = "native")]
22use crate::segment::{SegmentBuilder, SegmentBuilderConfig, SegmentMerger};
23use crate::segment::{SegmentId, SegmentReader};
24use crate::structures::BlockPostingList;
25#[cfg(feature = "native")]
26use crate::tokenizer::BoxedTokenizer;
27
28#[cfg(feature = "native")]
29use tokio::sync::Mutex as AsyncMutex;
30
31pub const SLICE_CACHE_FILENAME: &str = "index.slicecache";
33
34#[derive(Debug, Clone)]
36pub struct IndexConfig {
37 pub num_threads: usize,
39 pub num_indexing_threads: usize,
41 pub num_compression_threads: usize,
43 pub term_cache_blocks: usize,
45 pub store_cache_blocks: usize,
47 pub max_docs_per_segment: u32,
49 pub merge_threshold: usize,
51 pub optimization: crate::structures::IndexOptimization,
53}
54
55impl Default for IndexConfig {
56 fn default() -> Self {
57 #[cfg(feature = "native")]
58 let cpus = num_cpus::get().max(1);
59 #[cfg(not(feature = "native"))]
60 let cpus = 1;
61
62 Self {
63 num_threads: cpus,
64 num_indexing_threads: 1,
65 num_compression_threads: cpus,
66 term_cache_blocks: 256,
67 store_cache_blocks: 32,
68 max_docs_per_segment: 100_000,
69 merge_threshold: 5,
70 optimization: crate::structures::IndexOptimization::default(),
71 }
72 }
73}
74
75pub struct Index<D: Directory> {
80 directory: Arc<D>,
81 schema: Arc<Schema>,
82 config: IndexConfig,
83 segments: RwLock<Vec<Arc<SegmentReader>>>,
84 default_fields: Vec<crate::Field>,
85 tokenizers: Arc<crate::tokenizer::TokenizerRegistry>,
86 #[cfg(feature = "native")]
87 thread_pool: Arc<rayon::ThreadPool>,
88}
89
90impl<D: Directory> Index<D> {
91 pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
93 let directory = Arc::new(directory);
94
95 let schema_slice = directory.open_read(Path::new("schema.json")).await?;
97 let schema_bytes = schema_slice.read_bytes().await?;
98 let schema: Schema = serde_json::from_slice(schema_bytes.as_slice())
99 .map_err(|e| Error::Serialization(e.to_string()))?;
100 let schema = Arc::new(schema);
101
102 let segments = Self::load_segments(&directory, &schema, &config).await?;
104
105 #[cfg(feature = "native")]
106 let thread_pool = {
107 let pool = rayon::ThreadPoolBuilder::new()
108 .num_threads(config.num_threads)
109 .build()
110 .map_err(|e| Error::Io(std::io::Error::other(e)))?;
111 Arc::new(pool)
112 };
113
114 let default_fields: Vec<crate::Field> = if !schema.default_fields().is_empty() {
116 schema.default_fields().to_vec()
117 } else {
118 schema
119 .fields()
120 .filter(|(_, entry)| {
121 entry.indexed && entry.field_type == crate::dsl::FieldType::Text
122 })
123 .map(|(field, _)| field)
124 .collect()
125 };
126
127 Ok(Self {
128 directory,
129 schema,
130 config,
131 segments: RwLock::new(segments),
132 default_fields,
133 tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
134 #[cfg(feature = "native")]
135 thread_pool,
136 })
137 }
138
139 async fn load_segments(
140 directory: &Arc<D>,
141 schema: &Arc<Schema>,
142 config: &IndexConfig,
143 ) -> Result<Vec<Arc<SegmentReader>>> {
144 let segments_path = Path::new("segments.json");
146 if !directory.exists(segments_path).await? {
147 return Ok(Vec::new());
148 }
149
150 let segments_slice = directory.open_read(segments_path).await?;
151 let segments_bytes = segments_slice.read_bytes().await?;
152 let segment_ids: Vec<String> = serde_json::from_slice(segments_bytes.as_slice())
153 .map_err(|e| Error::Serialization(e.to_string()))?;
154
155 let mut segments = Vec::new();
156 let mut doc_id_offset = 0u32;
157
158 for id_str in segment_ids {
159 let segment_id = SegmentId::from_hex(&id_str)
160 .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
161 let reader = SegmentReader::open(
162 directory.as_ref(),
163 segment_id,
164 Arc::clone(schema),
165 doc_id_offset,
166 config.term_cache_blocks,
167 )
168 .await?;
169
170 doc_id_offset += reader.meta().num_docs;
171 segments.push(Arc::new(reader));
172 }
173
174 Ok(segments)
175 }
176
177 pub fn schema(&self) -> &Schema {
179 &self.schema
180 }
181
182 pub fn directory(&self) -> &D {
184 &self.directory
185 }
186
187 pub fn num_docs(&self) -> u32 {
189 self.segments.read().iter().map(|s| s.num_docs()).sum()
190 }
191
192 pub async fn doc(&self, doc_id: DocId) -> Result<Option<Document>> {
194 let segments = self.segments.read().clone();
195
196 let mut offset = 0u32;
197 for segment in segments.iter() {
198 let segment_docs = segment.meta().num_docs;
199 if doc_id < offset + segment_docs {
200 let local_doc_id = doc_id - offset;
201 return segment.doc(local_doc_id).await;
202 }
203 offset += segment_docs;
204 }
205
206 Ok(None)
207 }
208
209 pub async fn get_postings(
211 &self,
212 field: Field,
213 term: &[u8],
214 ) -> Result<Vec<(Arc<SegmentReader>, BlockPostingList)>> {
215 let segments = self.segments.read().clone();
216 let mut results = Vec::new();
217
218 for segment in segments.iter() {
219 if let Some(postings) = segment.get_postings(field, term).await? {
220 results.push((Arc::clone(segment), postings));
221 }
222 }
223
224 Ok(results)
225 }
226
227 #[cfg(feature = "native")]
229 pub async fn spawn_blocking<F, R>(&self, f: F) -> R
230 where
231 F: FnOnce() -> R + Send + 'static,
232 R: Send + 'static,
233 {
234 let (tx, rx) = tokio::sync::oneshot::channel();
235 self.thread_pool.spawn(move || {
236 let result = f();
237 let _ = tx.send(result);
238 });
239 rx.await.expect("Thread pool task panicked")
240 }
241
242 pub fn segment_readers(&self) -> Vec<Arc<SegmentReader>> {
244 self.segments.read().clone()
245 }
246
247 pub async fn reload(&self) -> Result<()> {
249 let new_segments = Self::load_segments(&self.directory, &self.schema, &self.config).await?;
250 *self.segments.write() = new_segments;
251 Ok(())
252 }
253
254 pub async fn search(
256 &self,
257 query: &dyn crate::query::Query,
258 limit: usize,
259 ) -> Result<Vec<crate::query::SearchResult>> {
260 let segments = self.segments.read().clone();
261 let mut all_results = Vec::new();
262
263 for segment in &segments {
264 let results = crate::query::search_segment(segment.as_ref(), query, limit).await?;
265 all_results.extend(results);
266 }
267
268 all_results.sort_by(|a, b| {
270 b.score
271 .partial_cmp(&a.score)
272 .unwrap_or(std::cmp::Ordering::Equal)
273 });
274 all_results.truncate(limit);
275
276 Ok(all_results)
277 }
278
279 pub async fn search_with_addresses(
281 &self,
282 query: &dyn crate::query::Query,
283 limit: usize,
284 ) -> Result<crate::query::SearchResponse> {
285 let segments = self.segments.read().clone();
286 let mut all_results: Vec<(u128, crate::query::SearchResult)> = Vec::new();
287
288 for segment in &segments {
289 let segment_id = segment.meta().id;
290 let results = crate::query::search_segment(segment.as_ref(), query, limit).await?;
291 for result in results {
292 all_results.push((segment_id, result));
293 }
294 }
295
296 all_results.sort_by(|a, b| {
298 b.1.score
299 .partial_cmp(&a.1.score)
300 .unwrap_or(std::cmp::Ordering::Equal)
301 });
302 all_results.truncate(limit);
303
304 let total_hits = all_results.len() as u32;
305 let hits: Vec<crate::query::SearchHit> = all_results
306 .into_iter()
307 .map(|(segment_id, result)| crate::query::SearchHit {
308 address: crate::query::DocAddress::new(segment_id, result.doc_id),
309 score: result.score,
310 })
311 .collect();
312
313 Ok(crate::query::SearchResponse { hits, total_hits })
314 }
315
316 pub async fn get_document(
318 &self,
319 address: &crate::query::DocAddress,
320 ) -> Result<Option<Document>> {
321 let segment_id = address
322 .segment_id_u128()
323 .ok_or_else(|| Error::Query(format!("Invalid segment ID: {}", address.segment_id)))?;
324
325 let segments = self.segments.read().clone();
326 for segment in &segments {
327 if segment.meta().id == segment_id {
328 return segment.doc(address.doc_id).await;
329 }
330 }
331
332 Ok(None)
333 }
334
335 pub fn default_fields(&self) -> &[crate::Field] {
337 &self.default_fields
338 }
339
340 pub fn set_default_fields(&mut self, fields: Vec<crate::Field>) {
342 self.default_fields = fields;
343 }
344
345 pub fn tokenizers(&self) -> &Arc<crate::tokenizer::TokenizerRegistry> {
347 &self.tokenizers
348 }
349
350 pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
355 let query_routers = self.schema.query_routers();
357 if !query_routers.is_empty() {
358 if let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers) {
360 return crate::dsl::QueryLanguageParser::with_router(
361 Arc::clone(&self.schema),
362 self.default_fields.clone(),
363 Arc::clone(&self.tokenizers),
364 router,
365 );
366 }
367 }
368
369 crate::dsl::QueryLanguageParser::new(
371 Arc::clone(&self.schema),
372 self.default_fields.clone(),
373 Arc::clone(&self.tokenizers),
374 )
375 }
376
377 pub async fn query(
383 &self,
384 query_str: &str,
385 limit: usize,
386 ) -> Result<crate::query::SearchResponse> {
387 let parser = self.query_parser();
388 let query = parser.parse(query_str).map_err(Error::Query)?;
389 self.search_with_addresses(query.as_ref(), limit).await
390 }
391}
392
393impl<D: Directory> Index<SliceCachingDirectory<D>> {
395 pub async fn open_with_cache(
400 directory: D,
401 config: IndexConfig,
402 cache_max_bytes: usize,
403 ) -> Result<Self> {
404 let caching_dir = SliceCachingDirectory::new(directory, cache_max_bytes);
405
406 let cache_path = Path::new(SLICE_CACHE_FILENAME);
408 if let Ok(true) = caching_dir.inner().exists(cache_path).await
409 && let Ok(slice) = caching_dir.inner().open_read(cache_path).await
410 && let Ok(bytes) = slice.read_bytes().await
411 {
412 let _ = caching_dir.deserialize(bytes.as_slice());
413 }
414
415 Self::open(caching_dir, config).await
416 }
417
418 #[cfg(feature = "native")]
423 pub async fn save_slice_cache(&self) -> Result<()>
424 where
425 D: DirectoryWriter,
426 {
427 let cache_data = self.directory.serialize();
428 let cache_path = Path::new(SLICE_CACHE_FILENAME);
429 self.directory
430 .inner()
431 .write(cache_path, &cache_data)
432 .await?;
433 Ok(())
434 }
435
436 pub fn slice_cache_stats(&self) -> crate::directories::SliceCacheStats {
438 self.directory.stats()
439 }
440}
441
442#[cfg(feature = "native")]
451pub async fn warmup_and_save_slice_cache<D: DirectoryWriter>(
452 directory: D,
453 config: IndexConfig,
454 cache_max_bytes: usize,
455) -> Result<()> {
456 let caching_dir = SliceCachingDirectory::new(directory, cache_max_bytes);
457 let index = Index::open(caching_dir, config).await?;
458
459 index.save_slice_cache().await?;
465
466 Ok(())
467}
468
469#[cfg(feature = "native")]
470impl<D: Directory> Clone for Index<D> {
471 fn clone(&self) -> Self {
472 Self {
473 directory: Arc::clone(&self.directory),
474 schema: Arc::clone(&self.schema),
475 config: self.config.clone(),
476 segments: RwLock::new(self.segments.read().clone()),
477 default_fields: self.default_fields.clone(),
478 tokenizers: Arc::clone(&self.tokenizers),
479 thread_pool: Arc::clone(&self.thread_pool),
480 }
481 }
482}
483
484#[cfg(feature = "native")]
493pub struct IndexWriter<D: DirectoryWriter> {
494 directory: Arc<D>,
495 schema: Arc<Schema>,
496 config: IndexConfig,
497 builder_config: SegmentBuilderConfig,
498 tokenizers: FxHashMap<Field, BoxedTokenizer>,
499 builder: AsyncMutex<Option<SegmentBuilder>>,
501 segment_ids: AsyncMutex<Vec<String>>,
503}
504
505#[cfg(feature = "native")]
506impl<D: DirectoryWriter> IndexWriter<D> {
507 pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
509 Self::create_with_config(directory, schema, config, SegmentBuilderConfig::default()).await
510 }
511
512 pub async fn create_with_config(
514 directory: D,
515 schema: Schema,
516 config: IndexConfig,
517 builder_config: SegmentBuilderConfig,
518 ) -> Result<Self> {
519 let directory = Arc::new(directory);
520 let schema = Arc::new(schema);
521
522 let schema_bytes =
524 serde_json::to_vec(&*schema).map_err(|e| Error::Serialization(e.to_string()))?;
525 directory
526 .write(Path::new("schema.json"), &schema_bytes)
527 .await?;
528
529 let segments_bytes = serde_json::to_vec(&Vec::<String>::new())
531 .map_err(|e| Error::Serialization(e.to_string()))?;
532 directory
533 .write(Path::new("segments.json"), &segments_bytes)
534 .await?;
535
536 Ok(Self {
537 directory,
538 schema,
539 config,
540 builder_config,
541 tokenizers: FxHashMap::default(),
542 builder: AsyncMutex::new(None),
543 segment_ids: AsyncMutex::new(Vec::new()),
544 })
545 }
546
547 pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
549 Self::open_with_config(directory, config, SegmentBuilderConfig::default()).await
550 }
551
552 pub async fn open_with_config(
554 directory: D,
555 config: IndexConfig,
556 builder_config: SegmentBuilderConfig,
557 ) -> Result<Self> {
558 let directory = Arc::new(directory);
559
560 let schema_slice = directory.open_read(Path::new("schema.json")).await?;
562 let schema_bytes = schema_slice.read_bytes().await?;
563 let schema: Schema = serde_json::from_slice(schema_bytes.as_slice())
564 .map_err(|e| Error::Serialization(e.to_string()))?;
565 let schema = Arc::new(schema);
566
567 let segments_slice = directory.open_read(Path::new("segments.json")).await?;
569 let segments_bytes = segments_slice.read_bytes().await?;
570 let segment_ids: Vec<String> = serde_json::from_slice(segments_bytes.as_slice())
571 .map_err(|e| Error::Serialization(e.to_string()))?;
572
573 Ok(Self {
574 directory,
575 schema,
576 config,
577 builder_config,
578 tokenizers: FxHashMap::default(),
579 builder: AsyncMutex::new(None),
580 segment_ids: AsyncMutex::new(segment_ids),
581 })
582 }
583
584 pub fn schema(&self) -> &Schema {
586 &self.schema
587 }
588
589 pub fn set_tokenizer<T: crate::tokenizer::Tokenizer>(&mut self, field: Field, tokenizer: T) {
591 self.tokenizers.insert(field, Box::new(tokenizer));
592 }
593
594 pub async fn add_document(&self, doc: Document) -> Result<DocId> {
599 let mut builder_guard = self.builder.lock().await;
600
601 if builder_guard.is_none() {
603 let mut builder =
604 SegmentBuilder::new((*self.schema).clone(), self.builder_config.clone())?;
605 for (field, tokenizer) in &self.tokenizers {
606 builder.set_tokenizer(*field, tokenizer.clone_box());
607 }
608 *builder_guard = Some(builder);
609 }
610
611 let builder = builder_guard.as_mut().unwrap();
612 let doc_id = builder.add_document(doc)?;
613
614 if builder.num_docs() >= self.config.max_docs_per_segment {
616 let full_builder = builder_guard.take().unwrap();
617 drop(builder_guard); self.commit_builder(full_builder).await?;
619 }
620
621 Ok(doc_id)
622 }
623
624 pub async fn get_builder_stats(&self) -> Option<crate::segment::SegmentBuilderStats> {
626 let builder_guard = self.builder.lock().await;
627 builder_guard.as_ref().map(|b| b.stats())
628 }
629
630 pub async fn commit(&self) -> Result<()> {
632 let mut builder_guard = self.builder.lock().await;
633
634 if let Some(builder) = builder_guard.take()
635 && builder.num_docs() > 0
636 {
637 drop(builder_guard);
638 self.commit_builder(builder).await?;
639 }
640
641 Ok(())
642 }
643
644 async fn commit_builder(&self, builder: SegmentBuilder) -> Result<()> {
645 let segment_id = SegmentId::new();
646 builder.build(self.directory.as_ref(), segment_id).await?;
647
648 let mut segment_ids = self.segment_ids.lock().await;
650 segment_ids.push(segment_id.to_hex());
651
652 let segments_bytes =
653 serde_json::to_vec(&*segment_ids).map_err(|e| Error::Serialization(e.to_string()))?;
654 self.directory
655 .write(Path::new("segments.json"), &segments_bytes)
656 .await?;
657
658 if segment_ids.len() >= self.config.merge_threshold {
660 drop(segment_ids); self.maybe_merge().await?;
662 }
663
664 Ok(())
665 }
666
667 async fn maybe_merge(&self) -> Result<()> {
669 let segment_ids = self.segment_ids.lock().await;
670
671 if segment_ids.len() < self.config.merge_threshold {
672 return Ok(());
673 }
674
675 let ids_to_merge: Vec<String> = segment_ids.clone();
676 drop(segment_ids);
677
678 let mut readers = Vec::new();
680 let mut doc_offset = 0u32;
681
682 for id_str in &ids_to_merge {
683 let segment_id = SegmentId::from_hex(id_str)
684 .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
685 let reader = SegmentReader::open(
686 self.directory.as_ref(),
687 segment_id,
688 Arc::clone(&self.schema),
689 doc_offset,
690 self.config.term_cache_blocks,
691 )
692 .await?;
693 doc_offset += reader.meta().num_docs;
694 readers.push(reader);
695 }
696
697 let merger = SegmentMerger::new(Arc::clone(&self.schema));
699 let new_segment_id = SegmentId::new();
700 merger
701 .merge(self.directory.as_ref(), &readers, new_segment_id)
702 .await?;
703
704 let mut segment_ids = self.segment_ids.lock().await;
706 segment_ids.clear();
707 segment_ids.push(new_segment_id.to_hex());
708
709 let segments_bytes =
710 serde_json::to_vec(&*segment_ids).map_err(|e| Error::Serialization(e.to_string()))?;
711 self.directory
712 .write(Path::new("segments.json"), &segments_bytes)
713 .await?;
714
715 for id_str in ids_to_merge {
717 if let Some(segment_id) = SegmentId::from_hex(&id_str) {
718 let _ = crate::segment::delete_segment(self.directory.as_ref(), segment_id).await;
719 }
720 }
721
722 Ok(())
723 }
724
725 pub async fn force_merge(&self) -> Result<()> {
727 self.commit().await?;
729
730 let segment_ids = self.segment_ids.lock().await;
731 if segment_ids.len() <= 1 {
732 return Ok(());
733 }
734
735 let ids_to_merge: Vec<String> = segment_ids.clone();
736 drop(segment_ids);
737
738 let mut readers = Vec::new();
740 let mut doc_offset = 0u32;
741
742 for id_str in &ids_to_merge {
743 let segment_id = SegmentId::from_hex(id_str)
744 .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
745 let reader = SegmentReader::open(
746 self.directory.as_ref(),
747 segment_id,
748 Arc::clone(&self.schema),
749 doc_offset,
750 self.config.term_cache_blocks,
751 )
752 .await?;
753 doc_offset += reader.meta().num_docs;
754 readers.push(reader);
755 }
756
757 let merger = SegmentMerger::new(Arc::clone(&self.schema));
759 let new_segment_id = SegmentId::new();
760 merger
761 .merge(self.directory.as_ref(), &readers, new_segment_id)
762 .await?;
763
764 let mut segment_ids = self.segment_ids.lock().await;
766 segment_ids.clear();
767 segment_ids.push(new_segment_id.to_hex());
768
769 let segments_bytes =
770 serde_json::to_vec(&*segment_ids).map_err(|e| Error::Serialization(e.to_string()))?;
771 self.directory
772 .write(Path::new("segments.json"), &segments_bytes)
773 .await?;
774
775 for id_str in ids_to_merge {
777 if let Some(segment_id) = SegmentId::from_hex(&id_str) {
778 let _ = crate::segment::delete_segment(self.directory.as_ref(), segment_id).await;
779 }
780 }
781
782 Ok(())
783 }
784}
785
786#[cfg(test)]
787mod tests {
788 use super::*;
789 use crate::directories::RamDirectory;
790 use crate::dsl::SchemaBuilder;
791
792 #[tokio::test]
793 async fn test_index_create_and_search() {
794 let mut schema_builder = SchemaBuilder::default();
795 let title = schema_builder.add_text_field("title", true, true);
796 let body = schema_builder.add_text_field("body", true, true);
797 let schema = schema_builder.build();
798
799 let dir = RamDirectory::new();
800 let config = IndexConfig::default();
801
802 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
804 .await
805 .unwrap();
806
807 let mut doc1 = Document::new();
808 doc1.add_text(title, "Hello World");
809 doc1.add_text(body, "This is the first document");
810 writer.add_document(doc1).await.unwrap();
811
812 let mut doc2 = Document::new();
813 doc2.add_text(title, "Goodbye World");
814 doc2.add_text(body, "This is the second document");
815 writer.add_document(doc2).await.unwrap();
816
817 writer.commit().await.unwrap();
818
819 let index = Index::open(dir, config).await.unwrap();
821 assert_eq!(index.num_docs(), 2);
822
823 let postings = index.get_postings(title, b"world").await.unwrap();
825 assert_eq!(postings.len(), 1); assert_eq!(postings[0].1.doc_count(), 2); let doc = index.doc(0).await.unwrap().unwrap();
830 assert_eq!(doc.get_first(title).unwrap().as_text(), Some("Hello World"));
831 }
832
833 #[tokio::test]
834 async fn test_multiple_segments() {
835 let mut schema_builder = SchemaBuilder::default();
836 let title = schema_builder.add_text_field("title", true, true);
837 let schema = schema_builder.build();
838
839 let dir = RamDirectory::new();
840 let config = IndexConfig {
841 max_docs_per_segment: 5, merge_threshold: 10, ..Default::default()
844 };
845
846 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
847 .await
848 .unwrap();
849
850 for batch in 0..3 {
852 for i in 0..5 {
853 let mut doc = Document::new();
854 doc.add_text(title, format!("Document {} batch {}", i, batch));
855 writer.add_document(doc).await.unwrap();
856 }
857 writer.commit().await.unwrap();
858 }
859
860 let index = Index::open(dir, config).await.unwrap();
862 assert_eq!(index.num_docs(), 15);
863 assert_eq!(index.segment_readers().len(), 3);
864 }
865
866 #[tokio::test]
867 async fn test_segment_merge() {
868 let mut schema_builder = SchemaBuilder::default();
869 let title = schema_builder.add_text_field("title", true, true);
870 let schema = schema_builder.build();
871
872 let dir = RamDirectory::new();
873 let config = IndexConfig {
874 max_docs_per_segment: 3,
875 merge_threshold: 100, ..Default::default()
877 };
878
879 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
880 .await
881 .unwrap();
882
883 for i in 0..9 {
885 let mut doc = Document::new();
886 doc.add_text(title, format!("Document {}", i));
887 writer.add_document(doc).await.unwrap();
888 }
889 writer.commit().await.unwrap();
890
891 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
893 assert_eq!(index.segment_readers().len(), 3);
894
895 let writer = IndexWriter::open(dir.clone(), config.clone())
897 .await
898 .unwrap();
899 writer.force_merge().await.unwrap();
900
901 let index = Index::open(dir, config).await.unwrap();
903 assert_eq!(index.segment_readers().len(), 1);
904 assert_eq!(index.num_docs(), 9);
905
906 for i in 0..9 {
908 let doc = index.doc(i).await.unwrap().unwrap();
909 assert_eq!(
910 doc.get_first(title).unwrap().as_text(),
911 Some(format!("Document {}", i).as_str())
912 );
913 }
914 }
915
916 #[tokio::test]
917 async fn test_match_query() {
918 let mut schema_builder = SchemaBuilder::default();
919 let title = schema_builder.add_text_field("title", true, true);
920 let body = schema_builder.add_text_field("body", true, true);
921 let schema = schema_builder.build();
922
923 let dir = RamDirectory::new();
924 let config = IndexConfig::default();
925
926 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
927 .await
928 .unwrap();
929
930 let mut doc1 = Document::new();
931 doc1.add_text(title, "rust programming");
932 doc1.add_text(body, "Learn rust language");
933 writer.add_document(doc1).await.unwrap();
934
935 let mut doc2 = Document::new();
936 doc2.add_text(title, "python programming");
937 doc2.add_text(body, "Learn python language");
938 writer.add_document(doc2).await.unwrap();
939
940 writer.commit().await.unwrap();
941
942 let index = Index::open(dir, config).await.unwrap();
943
944 let results = index.query("rust", 10).await.unwrap();
946 assert_eq!(results.hits.len(), 1);
947
948 let results = index.query("rust programming", 10).await.unwrap();
950 assert!(!results.hits.is_empty());
951
952 let hit = &results.hits[0];
954 assert!(!hit.address.segment_id.is_empty(), "Should have segment_id");
955
956 let doc = index.get_document(&hit.address).await.unwrap().unwrap();
958 assert!(
959 !doc.field_values().is_empty(),
960 "Doc should have field values"
961 );
962
963 let doc = index.doc(0).await.unwrap().unwrap();
965 assert!(
966 !doc.field_values().is_empty(),
967 "Doc should have field values"
968 );
969 }
970
971 #[tokio::test]
972 async fn test_slice_cache_warmup_and_load() {
973 use crate::directories::SliceCachingDirectory;
974
975 let mut schema_builder = SchemaBuilder::default();
976 let title = schema_builder.add_text_field("title", true, true);
977 let body = schema_builder.add_text_field("body", true, true);
978 let schema = schema_builder.build();
979
980 let dir = RamDirectory::new();
981 let config = IndexConfig::default();
982
983 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
985 .await
986 .unwrap();
987
988 for i in 0..10 {
989 let mut doc = Document::new();
990 doc.add_text(title, format!("Document {} about rust", i));
991 doc.add_text(body, format!("This is body text number {}", i));
992 writer.add_document(doc).await.unwrap();
993 }
994 writer.commit().await.unwrap();
995
996 let caching_dir = SliceCachingDirectory::new(dir.clone(), 1024 * 1024);
998 let index = Index::open(caching_dir, config.clone()).await.unwrap();
999
1000 let results = index.query("rust", 10).await.unwrap();
1002 assert!(!results.hits.is_empty());
1003
1004 let stats = index.slice_cache_stats();
1006 assert!(stats.total_bytes > 0, "Cache should have data after search");
1007
1008 index.save_slice_cache().await.unwrap();
1010
1011 assert!(
1013 dir.exists(Path::new(super::SLICE_CACHE_FILENAME))
1014 .await
1015 .unwrap()
1016 );
1017
1018 let index2 = Index::open_with_cache(dir.clone(), config.clone(), 1024 * 1024)
1020 .await
1021 .unwrap();
1022
1023 let stats2 = index2.slice_cache_stats();
1025 assert!(
1026 stats2.total_bytes > 0,
1027 "Cache should be prefilled from file"
1028 );
1029
1030 let results2 = index2.query("rust", 10).await.unwrap();
1032 assert_eq!(results.hits.len(), results2.hits.len());
1033 }
1034
1035 #[tokio::test]
1036 async fn test_multivalue_field_indexing_and_search() {
1037 let mut schema_builder = SchemaBuilder::default();
1038 let uris = schema_builder.add_text_field("uris", true, true);
1039 let title = schema_builder.add_text_field("title", true, true);
1040 let schema = schema_builder.build();
1041
1042 let dir = RamDirectory::new();
1043 let config = IndexConfig::default();
1044
1045 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1047 .await
1048 .unwrap();
1049
1050 let mut doc = Document::new();
1051 doc.add_text(uris, "one");
1052 doc.add_text(uris, "two");
1053 doc.add_text(title, "Test Document");
1054 writer.add_document(doc).await.unwrap();
1055
1056 let mut doc2 = Document::new();
1058 doc2.add_text(uris, "three");
1059 doc2.add_text(title, "Another Document");
1060 writer.add_document(doc2).await.unwrap();
1061
1062 writer.commit().await.unwrap();
1063
1064 let index = Index::open(dir, config).await.unwrap();
1066 assert_eq!(index.num_docs(), 2);
1067
1068 let doc = index.doc(0).await.unwrap().unwrap();
1070 let all_uris: Vec<_> = doc.get_all(uris).collect();
1071 assert_eq!(all_uris.len(), 2, "Should have 2 uris values");
1072 assert_eq!(all_uris[0].as_text(), Some("one"));
1073 assert_eq!(all_uris[1].as_text(), Some("two"));
1074
1075 let json = doc.to_json(index.schema());
1077 let uris_json = json.get("uris").unwrap();
1078 assert!(uris_json.is_array(), "Multi-value field should be an array");
1079 let uris_arr = uris_json.as_array().unwrap();
1080 assert_eq!(uris_arr.len(), 2);
1081 assert_eq!(uris_arr[0].as_str(), Some("one"));
1082 assert_eq!(uris_arr[1].as_str(), Some("two"));
1083
1084 let results = index.query("uris:one", 10).await.unwrap();
1086 assert_eq!(results.hits.len(), 1, "Should find doc with 'one'");
1087 assert_eq!(results.hits[0].address.doc_id, 0);
1088
1089 let results = index.query("uris:two", 10).await.unwrap();
1090 assert_eq!(results.hits.len(), 1, "Should find doc with 'two'");
1091 assert_eq!(results.hits[0].address.doc_id, 0);
1092
1093 let results = index.query("uris:three", 10).await.unwrap();
1094 assert_eq!(results.hits.len(), 1, "Should find doc with 'three'");
1095 assert_eq!(results.hits[0].address.doc_id, 1);
1096
1097 let results = index.query("uris:nonexistent", 10).await.unwrap();
1099 assert_eq!(results.hits.len(), 0, "Should not find non-existent value");
1100 }
1101}