1use std::path::Path;
9use std::sync::Arc;
10
11use parking_lot::RwLock;
12#[cfg(feature = "native")]
13use rustc_hash::FxHashMap;
14
15use crate::DocId;
16#[cfg(feature = "native")]
17use crate::directories::DirectoryWriter;
18use crate::directories::{Directory, SliceCachingDirectory};
19use crate::dsl::{Document, Field, Schema};
20use crate::error::{Error, Result};
21#[cfg(feature = "native")]
22use crate::segment::{SegmentBuilder, SegmentMerger};
23use crate::segment::{SegmentId, SegmentReader};
24use crate::structures::BlockPostingList;
25#[cfg(feature = "native")]
26use crate::tokenizer::BoxedTokenizer;
27
28#[cfg(feature = "native")]
29use tokio::sync::Mutex as AsyncMutex;
30
31pub const SLICE_CACHE_FILENAME: &str = "index.slicecache";
33
34#[derive(Debug, Clone)]
36pub struct IndexConfig {
37 pub num_threads: usize,
39 pub num_indexing_threads: usize,
41 pub num_compression_threads: usize,
43 pub term_cache_blocks: usize,
45 pub store_cache_blocks: usize,
47 pub max_docs_per_segment: u32,
49 pub merge_threshold: usize,
51}
52
53impl Default for IndexConfig {
54 fn default() -> Self {
55 #[cfg(feature = "native")]
56 let cpus = num_cpus::get().max(1);
57 #[cfg(not(feature = "native"))]
58 let cpus = 1;
59
60 Self {
61 num_threads: cpus,
62 num_indexing_threads: 1,
63 num_compression_threads: cpus,
64 term_cache_blocks: 256,
65 store_cache_blocks: 32,
66 max_docs_per_segment: 100_000,
67 merge_threshold: 5,
68 }
69 }
70}
71
72pub struct Index<D: Directory> {
77 directory: Arc<D>,
78 schema: Arc<Schema>,
79 config: IndexConfig,
80 segments: RwLock<Vec<Arc<SegmentReader>>>,
81 default_fields: Vec<crate::Field>,
82 tokenizers: Arc<crate::tokenizer::TokenizerRegistry>,
83 #[cfg(feature = "native")]
84 thread_pool: Arc<rayon::ThreadPool>,
85}
86
87impl<D: Directory> Index<D> {
88 pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
90 let directory = Arc::new(directory);
91
92 let schema_slice = directory.open_read(Path::new("schema.json")).await?;
94 let schema_bytes = schema_slice.read_bytes().await?;
95 let schema: Schema = serde_json::from_slice(schema_bytes.as_slice())
96 .map_err(|e| Error::Serialization(e.to_string()))?;
97 let schema = Arc::new(schema);
98
99 let segments = Self::load_segments(&directory, &schema, &config).await?;
101
102 #[cfg(feature = "native")]
103 let thread_pool = {
104 let pool = rayon::ThreadPoolBuilder::new()
105 .num_threads(config.num_threads)
106 .build()
107 .map_err(|e| Error::Io(std::io::Error::other(e)))?;
108 Arc::new(pool)
109 };
110
111 let default_fields: Vec<crate::Field> = if !schema.default_fields().is_empty() {
113 schema.default_fields().to_vec()
114 } else {
115 schema
116 .fields()
117 .filter(|(_, entry)| {
118 entry.indexed && entry.field_type == crate::dsl::FieldType::Text
119 })
120 .map(|(field, _)| field)
121 .collect()
122 };
123
124 Ok(Self {
125 directory,
126 schema,
127 config,
128 segments: RwLock::new(segments),
129 default_fields,
130 tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
131 #[cfg(feature = "native")]
132 thread_pool,
133 })
134 }
135
136 async fn load_segments(
137 directory: &Arc<D>,
138 schema: &Arc<Schema>,
139 config: &IndexConfig,
140 ) -> Result<Vec<Arc<SegmentReader>>> {
141 let segments_path = Path::new("segments.json");
143 if !directory.exists(segments_path).await? {
144 return Ok(Vec::new());
145 }
146
147 let segments_slice = directory.open_read(segments_path).await?;
148 let segments_bytes = segments_slice.read_bytes().await?;
149 let segment_ids: Vec<String> = serde_json::from_slice(segments_bytes.as_slice())
150 .map_err(|e| Error::Serialization(e.to_string()))?;
151
152 let mut segments = Vec::new();
153 let mut doc_id_offset = 0u32;
154
155 for id_str in segment_ids {
156 let segment_id = SegmentId::from_hex(&id_str)
157 .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
158 let reader = SegmentReader::open(
159 directory.as_ref(),
160 segment_id,
161 Arc::clone(schema),
162 doc_id_offset,
163 config.term_cache_blocks,
164 )
165 .await?;
166
167 doc_id_offset += reader.meta().num_docs;
168 segments.push(Arc::new(reader));
169 }
170
171 Ok(segments)
172 }
173
174 pub fn schema(&self) -> &Schema {
176 &self.schema
177 }
178
179 pub fn directory(&self) -> &D {
181 &self.directory
182 }
183
184 pub fn num_docs(&self) -> u32 {
186 self.segments.read().iter().map(|s| s.num_docs()).sum()
187 }
188
189 pub async fn doc(&self, doc_id: DocId) -> Result<Option<Document>> {
191 let segments = self.segments.read().clone();
192
193 let mut offset = 0u32;
194 for segment in segments.iter() {
195 let segment_docs = segment.meta().num_docs;
196 if doc_id < offset + segment_docs {
197 let local_doc_id = doc_id - offset;
198 return segment.doc(local_doc_id).await;
199 }
200 offset += segment_docs;
201 }
202
203 Ok(None)
204 }
205
206 pub async fn get_postings(
208 &self,
209 field: Field,
210 term: &[u8],
211 ) -> Result<Vec<(Arc<SegmentReader>, BlockPostingList)>> {
212 let segments = self.segments.read().clone();
213 let mut results = Vec::new();
214
215 for segment in segments.iter() {
216 if let Some(postings) = segment.get_postings(field, term).await? {
217 results.push((Arc::clone(segment), postings));
218 }
219 }
220
221 Ok(results)
222 }
223
224 #[cfg(feature = "native")]
226 pub async fn spawn_blocking<F, R>(&self, f: F) -> R
227 where
228 F: FnOnce() -> R + Send + 'static,
229 R: Send + 'static,
230 {
231 let (tx, rx) = tokio::sync::oneshot::channel();
232 self.thread_pool.spawn(move || {
233 let result = f();
234 let _ = tx.send(result);
235 });
236 rx.await.expect("Thread pool task panicked")
237 }
238
239 pub fn segment_readers(&self) -> Vec<Arc<SegmentReader>> {
241 self.segments.read().clone()
242 }
243
244 pub async fn reload(&self) -> Result<()> {
246 let new_segments = Self::load_segments(&self.directory, &self.schema, &self.config).await?;
247 *self.segments.write() = new_segments;
248 Ok(())
249 }
250
251 pub async fn search(
253 &self,
254 query: &dyn crate::query::Query,
255 limit: usize,
256 ) -> Result<Vec<crate::query::SearchResult>> {
257 let segments = self.segments.read().clone();
258 let mut all_results = Vec::new();
259
260 for segment in &segments {
261 let results = crate::query::search_segment(segment.as_ref(), query, limit).await?;
262 all_results.extend(results);
263 }
264
265 all_results.sort_by(|a, b| {
267 b.score
268 .partial_cmp(&a.score)
269 .unwrap_or(std::cmp::Ordering::Equal)
270 });
271 all_results.truncate(limit);
272
273 Ok(all_results)
274 }
275
276 pub async fn search_with_addresses(
278 &self,
279 query: &dyn crate::query::Query,
280 limit: usize,
281 ) -> Result<crate::query::SearchResponse> {
282 let segments = self.segments.read().clone();
283 let mut all_results: Vec<(u128, crate::query::SearchResult)> = Vec::new();
284
285 for segment in &segments {
286 let segment_id = segment.meta().id;
287 let results = crate::query::search_segment(segment.as_ref(), query, limit).await?;
288 for result in results {
289 all_results.push((segment_id, result));
290 }
291 }
292
293 all_results.sort_by(|a, b| {
295 b.1.score
296 .partial_cmp(&a.1.score)
297 .unwrap_or(std::cmp::Ordering::Equal)
298 });
299 all_results.truncate(limit);
300
301 let total_hits = all_results.len() as u32;
302 let hits: Vec<crate::query::SearchHit> = all_results
303 .into_iter()
304 .map(|(segment_id, result)| crate::query::SearchHit {
305 address: crate::query::DocAddress::new(segment_id, result.doc_id),
306 score: result.score,
307 })
308 .collect();
309
310 Ok(crate::query::SearchResponse { hits, total_hits })
311 }
312
313 pub async fn get_document(
315 &self,
316 address: &crate::query::DocAddress,
317 ) -> Result<Option<Document>> {
318 let segment_id = address
319 .segment_id_u128()
320 .ok_or_else(|| Error::Query(format!("Invalid segment ID: {}", address.segment_id)))?;
321
322 let segments = self.segments.read().clone();
323 for segment in &segments {
324 if segment.meta().id == segment_id {
325 return segment.doc(address.doc_id).await;
326 }
327 }
328
329 Ok(None)
330 }
331
332 pub fn default_fields(&self) -> &[crate::Field] {
334 &self.default_fields
335 }
336
337 pub fn set_default_fields(&mut self, fields: Vec<crate::Field>) {
339 self.default_fields = fields;
340 }
341
342 pub fn tokenizers(&self) -> &Arc<crate::tokenizer::TokenizerRegistry> {
344 &self.tokenizers
345 }
346
347 pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
352 let query_routers = self.schema.query_routers();
354 if !query_routers.is_empty() {
355 if let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers) {
357 return crate::dsl::QueryLanguageParser::with_router(
358 Arc::clone(&self.schema),
359 self.default_fields.clone(),
360 Arc::clone(&self.tokenizers),
361 router,
362 );
363 }
364 }
365
366 crate::dsl::QueryLanguageParser::new(
368 Arc::clone(&self.schema),
369 self.default_fields.clone(),
370 Arc::clone(&self.tokenizers),
371 )
372 }
373
374 pub async fn query(
380 &self,
381 query_str: &str,
382 limit: usize,
383 ) -> Result<crate::query::SearchResponse> {
384 let parser = self.query_parser();
385 let query = parser.parse(query_str).map_err(Error::Query)?;
386 self.search_with_addresses(query.as_ref(), limit).await
387 }
388}
389
390impl<D: Directory> Index<SliceCachingDirectory<D>> {
392 pub async fn open_with_cache(
397 directory: D,
398 config: IndexConfig,
399 cache_max_bytes: usize,
400 ) -> Result<Self> {
401 let caching_dir = SliceCachingDirectory::new(directory, cache_max_bytes);
402
403 let cache_path = Path::new(SLICE_CACHE_FILENAME);
405 if let Ok(true) = caching_dir.inner().exists(cache_path).await
406 && let Ok(slice) = caching_dir.inner().open_read(cache_path).await
407 && let Ok(bytes) = slice.read_bytes().await
408 {
409 let _ = caching_dir.deserialize(bytes.as_slice());
410 }
411
412 Self::open(caching_dir, config).await
413 }
414
415 #[cfg(feature = "native")]
420 pub async fn save_slice_cache(&self) -> Result<()>
421 where
422 D: DirectoryWriter,
423 {
424 let cache_data = self.directory.serialize();
425 let cache_path = Path::new(SLICE_CACHE_FILENAME);
426 self.directory
427 .inner()
428 .write(cache_path, &cache_data)
429 .await?;
430 Ok(())
431 }
432
433 pub fn slice_cache_stats(&self) -> crate::directories::SliceCacheStats {
435 self.directory.stats()
436 }
437}
438
439#[cfg(feature = "native")]
448pub async fn warmup_and_save_slice_cache<D: DirectoryWriter>(
449 directory: D,
450 config: IndexConfig,
451 cache_max_bytes: usize,
452) -> Result<()> {
453 let caching_dir = SliceCachingDirectory::new(directory, cache_max_bytes);
454 let index = Index::open(caching_dir, config).await?;
455
456 index.save_slice_cache().await?;
462
463 Ok(())
464}
465
466#[cfg(feature = "native")]
467impl<D: Directory> Clone for Index<D> {
468 fn clone(&self) -> Self {
469 Self {
470 directory: Arc::clone(&self.directory),
471 schema: Arc::clone(&self.schema),
472 config: self.config.clone(),
473 segments: RwLock::new(self.segments.read().clone()),
474 default_fields: self.default_fields.clone(),
475 tokenizers: Arc::clone(&self.tokenizers),
476 thread_pool: Arc::clone(&self.thread_pool),
477 }
478 }
479}
480
481#[cfg(feature = "native")]
486pub struct IndexWriter<D: DirectoryWriter> {
487 directory: Arc<D>,
488 schema: Arc<Schema>,
489 config: IndexConfig,
490 tokenizers: FxHashMap<Field, BoxedTokenizer>,
491 segment_builders: AsyncMutex<Vec<SegmentBuilder>>,
493 next_builder: std::sync::atomic::AtomicUsize,
495 segment_ids: AsyncMutex<Vec<String>>,
497}
498
499#[cfg(feature = "native")]
500impl<D: DirectoryWriter> IndexWriter<D> {
501 pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
503 let directory = Arc::new(directory);
504 let schema = Arc::new(schema);
505
506 let schema_bytes =
508 serde_json::to_vec(&*schema).map_err(|e| Error::Serialization(e.to_string()))?;
509 directory
510 .write(Path::new("schema.json"), &schema_bytes)
511 .await?;
512
513 let segments_bytes = serde_json::to_vec(&Vec::<String>::new())
515 .map_err(|e| Error::Serialization(e.to_string()))?;
516 directory
517 .write(Path::new("segments.json"), &segments_bytes)
518 .await?;
519
520 Ok(Self {
521 directory,
522 schema,
523 config,
524 tokenizers: FxHashMap::default(),
525 segment_builders: AsyncMutex::new(Vec::new()),
526 next_builder: std::sync::atomic::AtomicUsize::new(0),
527 segment_ids: AsyncMutex::new(Vec::new()),
528 })
529 }
530
531 pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
533 let directory = Arc::new(directory);
534
535 let schema_slice = directory.open_read(Path::new("schema.json")).await?;
537 let schema_bytes = schema_slice.read_bytes().await?;
538 let schema: Schema = serde_json::from_slice(schema_bytes.as_slice())
539 .map_err(|e| Error::Serialization(e.to_string()))?;
540 let schema = Arc::new(schema);
541
542 let segments_slice = directory.open_read(Path::new("segments.json")).await?;
544 let segments_bytes = segments_slice.read_bytes().await?;
545 let segment_ids: Vec<String> = serde_json::from_slice(segments_bytes.as_slice())
546 .map_err(|e| Error::Serialization(e.to_string()))?;
547
548 Ok(Self {
549 directory,
550 schema,
551 config,
552 tokenizers: FxHashMap::default(),
553 segment_builders: AsyncMutex::new(Vec::new()),
554 next_builder: std::sync::atomic::AtomicUsize::new(0),
555 segment_ids: AsyncMutex::new(segment_ids),
556 })
557 }
558
559 pub fn schema(&self) -> &Schema {
561 &self.schema
562 }
563
564 pub fn set_tokenizer<T: crate::tokenizer::Tokenizer>(&mut self, field: Field, tokenizer: T) {
566 self.tokenizers.insert(field, Box::new(tokenizer));
567 }
568
569 pub async fn add_document(&self, doc: Document) -> Result<DocId> {
575 let num_builders = self.config.num_indexing_threads.max(1);
576 let mut builders = self.segment_builders.lock().await;
577
578 if builders.is_empty() {
580 for _ in 0..num_builders {
581 let mut builder = SegmentBuilder::new((*self.schema).clone());
582 for (field, tokenizer) in &self.tokenizers {
583 builder.set_tokenizer(*field, tokenizer.clone_box());
584 }
585 builders.push(builder);
586 }
587 }
588
589 let idx = self
591 .next_builder
592 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
593 % num_builders;
594 let builder = &mut builders[idx];
595 let doc_id = builder.add_document(doc)?;
596
597 if builder.num_docs() >= self.config.max_docs_per_segment {
599 let full_builder = std::mem::replace(builder, {
601 let mut new_builder = SegmentBuilder::new((*self.schema).clone());
602 for (field, tokenizer) in &self.tokenizers {
603 new_builder.set_tokenizer(*field, tokenizer.clone_box());
604 }
605 new_builder
606 });
607 drop(builders); self.commit_segment(full_builder).await?;
609 }
610
611 Ok(doc_id)
612 }
613
614 pub async fn commit(&self) -> Result<()> {
618 let mut builders = self.segment_builders.lock().await;
619
620 if builders.is_empty() {
621 return Ok(());
622 }
623
624 let builders_to_commit: Vec<SegmentBuilder> = std::mem::take(&mut *builders);
626 drop(builders); let mut handles = Vec::new();
630
631 for builder in builders_to_commit {
632 if builder.num_docs() == 0 {
633 continue;
634 }
635
636 let directory = Arc::clone(&self.directory);
637 let compression_threads = self.config.num_compression_threads;
638 let handle = tokio::spawn(async move {
639 let segment_id = SegmentId::new();
640 builder
641 .build_with_threads(directory.as_ref(), segment_id, compression_threads)
642 .await?;
643 Ok::<String, Error>(segment_id.to_hex())
644 });
645 handles.push(handle);
646 }
647
648 let mut new_segment_ids = Vec::new();
650 for handle in handles {
651 match handle.await {
652 Ok(Ok(id)) => new_segment_ids.push(id),
653 Ok(Err(e)) => return Err(e),
654 Err(e) => return Err(Error::Internal(format!("Task join error: {}", e))),
655 }
656 }
657
658 if !new_segment_ids.is_empty() {
660 let mut segment_ids = self.segment_ids.lock().await;
661 segment_ids.extend(new_segment_ids);
662
663 let segments_bytes = serde_json::to_vec(&*segment_ids)
664 .map_err(|e| Error::Serialization(e.to_string()))?;
665 self.directory
666 .write(Path::new("segments.json"), &segments_bytes)
667 .await?;
668 }
669
670 Ok(())
671 }
672
673 async fn commit_segment(&self, builder: SegmentBuilder) -> Result<()> {
674 if builder.num_docs() == 0 {
675 return Ok(());
676 }
677
678 let segment_id = SegmentId::new();
679 builder
680 .build_with_threads(
681 self.directory.as_ref(),
682 segment_id,
683 self.config.num_compression_threads,
684 )
685 .await?;
686
687 let mut segment_ids = self.segment_ids.lock().await;
689 segment_ids.push(segment_id.to_hex());
690
691 let segments_bytes =
692 serde_json::to_vec(&*segment_ids).map_err(|e| Error::Serialization(e.to_string()))?;
693 self.directory
694 .write(Path::new("segments.json"), &segments_bytes)
695 .await?;
696
697 if segment_ids.len() >= self.config.merge_threshold {
699 drop(segment_ids); self.maybe_merge().await?;
701 }
702
703 Ok(())
704 }
705
706 async fn maybe_merge(&self) -> Result<()> {
708 let segment_ids = self.segment_ids.lock().await;
709
710 if segment_ids.len() < self.config.merge_threshold {
711 return Ok(());
712 }
713
714 let ids_to_merge: Vec<String> = segment_ids.clone();
715 drop(segment_ids);
716
717 let mut readers = Vec::new();
719 let mut doc_offset = 0u32;
720
721 for id_str in &ids_to_merge {
722 let segment_id = SegmentId::from_hex(id_str)
723 .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
724 let reader = SegmentReader::open(
725 self.directory.as_ref(),
726 segment_id,
727 Arc::clone(&self.schema),
728 doc_offset,
729 self.config.term_cache_blocks,
730 )
731 .await?;
732 doc_offset += reader.meta().num_docs;
733 readers.push(reader);
734 }
735
736 let merger = SegmentMerger::new(Arc::clone(&self.schema));
738 let new_segment_id = SegmentId::new();
739 merger
740 .merge(self.directory.as_ref(), &readers, new_segment_id)
741 .await?;
742
743 let mut segment_ids = self.segment_ids.lock().await;
745 segment_ids.clear();
746 segment_ids.push(new_segment_id.to_hex());
747
748 let segments_bytes =
749 serde_json::to_vec(&*segment_ids).map_err(|e| Error::Serialization(e.to_string()))?;
750 self.directory
751 .write(Path::new("segments.json"), &segments_bytes)
752 .await?;
753
754 for id_str in ids_to_merge {
756 if let Some(segment_id) = SegmentId::from_hex(&id_str) {
757 let _ = crate::segment::delete_segment(self.directory.as_ref(), segment_id).await;
758 }
759 }
760
761 Ok(())
762 }
763
764 pub async fn force_merge(&self) -> Result<()> {
766 self.commit().await?;
768
769 let segment_ids = self.segment_ids.lock().await;
770 if segment_ids.len() <= 1 {
771 return Ok(());
772 }
773
774 let ids_to_merge: Vec<String> = segment_ids.clone();
775 drop(segment_ids);
776
777 let mut readers = Vec::new();
779 let mut doc_offset = 0u32;
780
781 for id_str in &ids_to_merge {
782 let segment_id = SegmentId::from_hex(id_str)
783 .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
784 let reader = SegmentReader::open(
785 self.directory.as_ref(),
786 segment_id,
787 Arc::clone(&self.schema),
788 doc_offset,
789 self.config.term_cache_blocks,
790 )
791 .await?;
792 doc_offset += reader.meta().num_docs;
793 readers.push(reader);
794 }
795
796 let merger = SegmentMerger::new(Arc::clone(&self.schema));
798 let new_segment_id = SegmentId::new();
799 merger
800 .merge(self.directory.as_ref(), &readers, new_segment_id)
801 .await?;
802
803 let mut segment_ids = self.segment_ids.lock().await;
805 segment_ids.clear();
806 segment_ids.push(new_segment_id.to_hex());
807
808 let segments_bytes =
809 serde_json::to_vec(&*segment_ids).map_err(|e| Error::Serialization(e.to_string()))?;
810 self.directory
811 .write(Path::new("segments.json"), &segments_bytes)
812 .await?;
813
814 for id_str in ids_to_merge {
816 if let Some(segment_id) = SegmentId::from_hex(&id_str) {
817 let _ = crate::segment::delete_segment(self.directory.as_ref(), segment_id).await;
818 }
819 }
820
821 Ok(())
822 }
823}
824
825#[cfg(test)]
826mod tests {
827 use super::*;
828 use crate::directories::RamDirectory;
829 use crate::dsl::SchemaBuilder;
830
831 #[tokio::test]
832 async fn test_index_create_and_search() {
833 let mut schema_builder = SchemaBuilder::default();
834 let title = schema_builder.add_text_field("title", true, true);
835 let body = schema_builder.add_text_field("body", true, true);
836 let schema = schema_builder.build();
837
838 let dir = RamDirectory::new();
839 let config = IndexConfig::default();
840
841 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
843 .await
844 .unwrap();
845
846 let mut doc1 = Document::new();
847 doc1.add_text(title, "Hello World");
848 doc1.add_text(body, "This is the first document");
849 writer.add_document(doc1).await.unwrap();
850
851 let mut doc2 = Document::new();
852 doc2.add_text(title, "Goodbye World");
853 doc2.add_text(body, "This is the second document");
854 writer.add_document(doc2).await.unwrap();
855
856 writer.commit().await.unwrap();
857
858 let index = Index::open(dir, config).await.unwrap();
860 assert_eq!(index.num_docs(), 2);
861
862 let postings = index.get_postings(title, b"world").await.unwrap();
864 assert_eq!(postings.len(), 1); assert_eq!(postings[0].1.doc_count(), 2); let doc = index.doc(0).await.unwrap().unwrap();
869 assert_eq!(doc.get_first(title).unwrap().as_text(), Some("Hello World"));
870 }
871
872 #[tokio::test]
873 async fn test_multiple_segments() {
874 let mut schema_builder = SchemaBuilder::default();
875 let title = schema_builder.add_text_field("title", true, true);
876 let schema = schema_builder.build();
877
878 let dir = RamDirectory::new();
879 let config = IndexConfig {
880 max_docs_per_segment: 5, merge_threshold: 10, ..Default::default()
883 };
884
885 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
886 .await
887 .unwrap();
888
889 for batch in 0..3 {
891 for i in 0..5 {
892 let mut doc = Document::new();
893 doc.add_text(title, format!("Document {} batch {}", i, batch));
894 writer.add_document(doc).await.unwrap();
895 }
896 writer.commit().await.unwrap();
897 }
898
899 let index = Index::open(dir, config).await.unwrap();
901 assert_eq!(index.num_docs(), 15);
902 assert_eq!(index.segment_readers().len(), 3);
903 }
904
905 #[tokio::test]
906 async fn test_segment_merge() {
907 let mut schema_builder = SchemaBuilder::default();
908 let title = schema_builder.add_text_field("title", true, true);
909 let schema = schema_builder.build();
910
911 let dir = RamDirectory::new();
912 let config = IndexConfig {
913 max_docs_per_segment: 3,
914 merge_threshold: 100, ..Default::default()
916 };
917
918 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
919 .await
920 .unwrap();
921
922 for i in 0..9 {
924 let mut doc = Document::new();
925 doc.add_text(title, format!("Document {}", i));
926 writer.add_document(doc).await.unwrap();
927 }
928 writer.commit().await.unwrap();
929
930 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
932 assert_eq!(index.segment_readers().len(), 3);
933
934 let writer = IndexWriter::open(dir.clone(), config.clone())
936 .await
937 .unwrap();
938 writer.force_merge().await.unwrap();
939
940 let index = Index::open(dir, config).await.unwrap();
942 assert_eq!(index.segment_readers().len(), 1);
943 assert_eq!(index.num_docs(), 9);
944
945 for i in 0..9 {
947 let doc = index.doc(i).await.unwrap().unwrap();
948 assert_eq!(
949 doc.get_first(title).unwrap().as_text(),
950 Some(format!("Document {}", i).as_str())
951 );
952 }
953 }
954
955 #[tokio::test]
956 async fn test_match_query() {
957 let mut schema_builder = SchemaBuilder::default();
958 let title = schema_builder.add_text_field("title", true, true);
959 let body = schema_builder.add_text_field("body", true, true);
960 let schema = schema_builder.build();
961
962 let dir = RamDirectory::new();
963 let config = IndexConfig::default();
964
965 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
966 .await
967 .unwrap();
968
969 let mut doc1 = Document::new();
970 doc1.add_text(title, "rust programming");
971 doc1.add_text(body, "Learn rust language");
972 writer.add_document(doc1).await.unwrap();
973
974 let mut doc2 = Document::new();
975 doc2.add_text(title, "python programming");
976 doc2.add_text(body, "Learn python language");
977 writer.add_document(doc2).await.unwrap();
978
979 writer.commit().await.unwrap();
980
981 let index = Index::open(dir, config).await.unwrap();
982
983 let results = index.query("rust", 10).await.unwrap();
985 assert_eq!(results.hits.len(), 1);
986
987 let results = index.query("rust programming", 10).await.unwrap();
989 assert!(!results.hits.is_empty());
990
991 let hit = &results.hits[0];
993 assert!(!hit.address.segment_id.is_empty(), "Should have segment_id");
994
995 let doc = index.get_document(&hit.address).await.unwrap().unwrap();
997 assert!(
998 !doc.field_values().is_empty(),
999 "Doc should have field values"
1000 );
1001
1002 let doc = index.doc(0).await.unwrap().unwrap();
1004 assert!(
1005 !doc.field_values().is_empty(),
1006 "Doc should have field values"
1007 );
1008 }
1009
1010 #[tokio::test]
1011 async fn test_slice_cache_warmup_and_load() {
1012 use crate::directories::SliceCachingDirectory;
1013
1014 let mut schema_builder = SchemaBuilder::default();
1015 let title = schema_builder.add_text_field("title", true, true);
1016 let body = schema_builder.add_text_field("body", true, true);
1017 let schema = schema_builder.build();
1018
1019 let dir = RamDirectory::new();
1020 let config = IndexConfig::default();
1021
1022 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1024 .await
1025 .unwrap();
1026
1027 for i in 0..10 {
1028 let mut doc = Document::new();
1029 doc.add_text(title, format!("Document {} about rust", i));
1030 doc.add_text(body, format!("This is body text number {}", i));
1031 writer.add_document(doc).await.unwrap();
1032 }
1033 writer.commit().await.unwrap();
1034
1035 let caching_dir = SliceCachingDirectory::new(dir.clone(), 1024 * 1024);
1037 let index = Index::open(caching_dir, config.clone()).await.unwrap();
1038
1039 let results = index.query("rust", 10).await.unwrap();
1041 assert!(!results.hits.is_empty());
1042
1043 let stats = index.slice_cache_stats();
1045 assert!(stats.total_bytes > 0, "Cache should have data after search");
1046
1047 index.save_slice_cache().await.unwrap();
1049
1050 assert!(
1052 dir.exists(Path::new(super::SLICE_CACHE_FILENAME))
1053 .await
1054 .unwrap()
1055 );
1056
1057 let index2 = Index::open_with_cache(dir.clone(), config.clone(), 1024 * 1024)
1059 .await
1060 .unwrap();
1061
1062 let stats2 = index2.slice_cache_stats();
1064 assert!(
1065 stats2.total_bytes > 0,
1066 "Cache should be prefilled from file"
1067 );
1068
1069 let results2 = index2.query("rust", 10).await.unwrap();
1071 assert_eq!(results.hits.len(), results2.hits.len());
1072 }
1073
1074 #[tokio::test]
1075 async fn test_multivalue_field_indexing_and_search() {
1076 let mut schema_builder = SchemaBuilder::default();
1077 let uris = schema_builder.add_text_field("uris", true, true);
1078 let title = schema_builder.add_text_field("title", true, true);
1079 let schema = schema_builder.build();
1080
1081 let dir = RamDirectory::new();
1082 let config = IndexConfig::default();
1083
1084 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1086 .await
1087 .unwrap();
1088
1089 let mut doc = Document::new();
1090 doc.add_text(uris, "one");
1091 doc.add_text(uris, "two");
1092 doc.add_text(title, "Test Document");
1093 writer.add_document(doc).await.unwrap();
1094
1095 let mut doc2 = Document::new();
1097 doc2.add_text(uris, "three");
1098 doc2.add_text(title, "Another Document");
1099 writer.add_document(doc2).await.unwrap();
1100
1101 writer.commit().await.unwrap();
1102
1103 let index = Index::open(dir, config).await.unwrap();
1105 assert_eq!(index.num_docs(), 2);
1106
1107 let doc = index.doc(0).await.unwrap().unwrap();
1109 let all_uris: Vec<_> = doc.get_all(uris).collect();
1110 assert_eq!(all_uris.len(), 2, "Should have 2 uris values");
1111 assert_eq!(all_uris[0].as_text(), Some("one"));
1112 assert_eq!(all_uris[1].as_text(), Some("two"));
1113
1114 let json = doc.to_json(index.schema());
1116 let uris_json = json.get("uris").unwrap();
1117 assert!(uris_json.is_array(), "Multi-value field should be an array");
1118 let uris_arr = uris_json.as_array().unwrap();
1119 assert_eq!(uris_arr.len(), 2);
1120 assert_eq!(uris_arr[0].as_str(), Some("one"));
1121 assert_eq!(uris_arr[1].as_str(), Some("two"));
1122
1123 let results = index.query("uris:one", 10).await.unwrap();
1125 assert_eq!(results.hits.len(), 1, "Should find doc with 'one'");
1126 assert_eq!(results.hits[0].address.doc_id, 0);
1127
1128 let results = index.query("uris:two", 10).await.unwrap();
1129 assert_eq!(results.hits.len(), 1, "Should find doc with 'two'");
1130 assert_eq!(results.hits[0].address.doc_id, 0);
1131
1132 let results = index.query("uris:three", 10).await.unwrap();
1133 assert_eq!(results.hits.len(), 1, "Should find doc with 'three'");
1134 assert_eq!(results.hits[0].address.doc_id, 1);
1135
1136 let results = index.query("uris:nonexistent", 10).await.unwrap();
1138 assert_eq!(results.hits.len(), 0, "Should not find non-existent value");
1139 }
1140}