1use std::path::Path;
9use std::sync::Arc;
10
11use parking_lot::RwLock;
12#[cfg(feature = "native")]
13use rustc_hash::FxHashMap;
14
15use crate::DocId;
16#[cfg(feature = "native")]
17use crate::directories::DirectoryWriter;
18use crate::directories::{Directory, SliceCachingDirectory};
19use crate::dsl::{Document, Field, Schema};
20use crate::error::{Error, Result};
21#[cfg(feature = "native")]
22use crate::segment::{SegmentBuilder, SegmentBuilderConfig, SegmentMerger};
23use crate::segment::{SegmentId, SegmentReader};
24use crate::structures::BlockPostingList;
25#[cfg(feature = "native")]
26use crate::tokenizer::BoxedTokenizer;
27
28#[cfg(feature = "native")]
29use rand::Rng;
30#[cfg(feature = "native")]
31use std::sync::atomic::{AtomicUsize, Ordering};
32#[cfg(feature = "native")]
33use tokio::sync::Mutex as AsyncMutex;
34#[cfg(feature = "native")]
35use tokio::sync::mpsc;
36
37pub const SLICE_CACHE_FILENAME: &str = "index.slicecache";
39
40#[derive(Debug, Clone)]
42pub struct IndexConfig {
43 pub num_threads: usize,
45 pub num_indexing_threads: usize,
47 pub num_compression_threads: usize,
49 pub term_cache_blocks: usize,
51 pub store_cache_blocks: usize,
53 pub max_docs_per_segment: u32,
55 pub optimization: crate::structures::IndexOptimization,
57}
58
59impl Default for IndexConfig {
60 fn default() -> Self {
61 #[cfg(feature = "native")]
62 let cpus = num_cpus::get().max(1);
63 #[cfg(not(feature = "native"))]
64 let cpus = 1;
65
66 Self {
67 num_threads: cpus,
68 num_indexing_threads: 1,
69 num_compression_threads: cpus,
70 term_cache_blocks: 256,
71 store_cache_blocks: 32,
72 max_docs_per_segment: 100_000,
73 optimization: crate::structures::IndexOptimization::default(),
74 }
75 }
76}
77
78pub struct Index<D: Directory> {
83 directory: Arc<D>,
84 schema: Arc<Schema>,
85 config: IndexConfig,
86 segments: RwLock<Vec<Arc<SegmentReader>>>,
87 default_fields: Vec<crate::Field>,
88 tokenizers: Arc<crate::tokenizer::TokenizerRegistry>,
89 #[cfg(feature = "native")]
90 thread_pool: Arc<rayon::ThreadPool>,
91}
92
93impl<D: Directory> Index<D> {
94 pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
96 let directory = Arc::new(directory);
97
98 let schema_slice = directory.open_read(Path::new("schema.json")).await?;
100 let schema_bytes = schema_slice.read_bytes().await?;
101 let schema: Schema = serde_json::from_slice(schema_bytes.as_slice())
102 .map_err(|e| Error::Serialization(e.to_string()))?;
103 let schema = Arc::new(schema);
104
105 let segments = Self::load_segments(&directory, &schema, &config).await?;
107
108 #[cfg(feature = "native")]
109 let thread_pool = {
110 let pool = rayon::ThreadPoolBuilder::new()
111 .num_threads(config.num_threads)
112 .build()
113 .map_err(|e| Error::Io(std::io::Error::other(e)))?;
114 Arc::new(pool)
115 };
116
117 let default_fields: Vec<crate::Field> = if !schema.default_fields().is_empty() {
119 schema.default_fields().to_vec()
120 } else {
121 schema
122 .fields()
123 .filter(|(_, entry)| {
124 entry.indexed && entry.field_type == crate::dsl::FieldType::Text
125 })
126 .map(|(field, _)| field)
127 .collect()
128 };
129
130 Ok(Self {
131 directory,
132 schema,
133 config,
134 segments: RwLock::new(segments),
135 default_fields,
136 tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
137 #[cfg(feature = "native")]
138 thread_pool,
139 })
140 }
141
142 async fn load_segments(
143 directory: &Arc<D>,
144 schema: &Arc<Schema>,
145 config: &IndexConfig,
146 ) -> Result<Vec<Arc<SegmentReader>>> {
147 let segments_path = Path::new("segments.json");
149 if !directory.exists(segments_path).await? {
150 return Ok(Vec::new());
151 }
152
153 let segments_slice = directory.open_read(segments_path).await?;
154 let segments_bytes = segments_slice.read_bytes().await?;
155 let segment_ids: Vec<String> = serde_json::from_slice(segments_bytes.as_slice())
156 .map_err(|e| Error::Serialization(e.to_string()))?;
157
158 let mut segments = Vec::new();
159 let mut doc_id_offset = 0u32;
160
161 for id_str in segment_ids {
162 let segment_id = SegmentId::from_hex(&id_str)
163 .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
164 let reader = SegmentReader::open(
165 directory.as_ref(),
166 segment_id,
167 Arc::clone(schema),
168 doc_id_offset,
169 config.term_cache_blocks,
170 )
171 .await?;
172
173 doc_id_offset += reader.meta().num_docs;
174 segments.push(Arc::new(reader));
175 }
176
177 Ok(segments)
178 }
179
180 pub fn schema(&self) -> &Schema {
182 &self.schema
183 }
184
185 pub fn directory(&self) -> &D {
187 &self.directory
188 }
189
190 pub fn num_docs(&self) -> u32 {
192 self.segments.read().iter().map(|s| s.num_docs()).sum()
193 }
194
195 pub async fn doc(&self, doc_id: DocId) -> Result<Option<Document>> {
197 let segments = self.segments.read().clone();
198
199 let mut offset = 0u32;
200 for segment in segments.iter() {
201 let segment_docs = segment.meta().num_docs;
202 if doc_id < offset + segment_docs {
203 let local_doc_id = doc_id - offset;
204 return segment.doc(local_doc_id).await;
205 }
206 offset += segment_docs;
207 }
208
209 Ok(None)
210 }
211
212 pub async fn get_postings(
214 &self,
215 field: Field,
216 term: &[u8],
217 ) -> Result<Vec<(Arc<SegmentReader>, BlockPostingList)>> {
218 let segments = self.segments.read().clone();
219 let mut results = Vec::new();
220
221 for segment in segments.iter() {
222 if let Some(postings) = segment.get_postings(field, term).await? {
223 results.push((Arc::clone(segment), postings));
224 }
225 }
226
227 Ok(results)
228 }
229
230 #[cfg(feature = "native")]
232 pub async fn spawn_blocking<F, R>(&self, f: F) -> R
233 where
234 F: FnOnce() -> R + Send + 'static,
235 R: Send + 'static,
236 {
237 let (tx, rx) = tokio::sync::oneshot::channel();
238 self.thread_pool.spawn(move || {
239 let result = f();
240 let _ = tx.send(result);
241 });
242 rx.await.expect("Thread pool task panicked")
243 }
244
245 pub fn segment_readers(&self) -> Vec<Arc<SegmentReader>> {
247 self.segments.read().clone()
248 }
249
250 pub async fn reload(&self) -> Result<()> {
252 let new_segments = Self::load_segments(&self.directory, &self.schema, &self.config).await?;
253 *self.segments.write() = new_segments;
254 Ok(())
255 }
256
257 pub async fn search(
259 &self,
260 query: &dyn crate::query::Query,
261 limit: usize,
262 ) -> Result<Vec<crate::query::SearchResult>> {
263 let segments = self.segments.read().clone();
264 let mut all_results = Vec::new();
265
266 for segment in &segments {
267 let results = crate::query::search_segment(segment.as_ref(), query, limit).await?;
268 all_results.extend(results);
269 }
270
271 all_results.sort_by(|a, b| {
273 b.score
274 .partial_cmp(&a.score)
275 .unwrap_or(std::cmp::Ordering::Equal)
276 });
277 all_results.truncate(limit);
278
279 Ok(all_results)
280 }
281
282 pub async fn search_with_addresses(
284 &self,
285 query: &dyn crate::query::Query,
286 limit: usize,
287 ) -> Result<crate::query::SearchResponse> {
288 let segments = self.segments.read().clone();
289 let mut all_results: Vec<(u128, crate::query::SearchResult)> = Vec::new();
290
291 for segment in &segments {
292 let segment_id = segment.meta().id;
293 let results = crate::query::search_segment(segment.as_ref(), query, limit).await?;
294 for result in results {
295 all_results.push((segment_id, result));
296 }
297 }
298
299 all_results.sort_by(|a, b| {
301 b.1.score
302 .partial_cmp(&a.1.score)
303 .unwrap_or(std::cmp::Ordering::Equal)
304 });
305 all_results.truncate(limit);
306
307 let total_hits = all_results.len() as u32;
308 let hits: Vec<crate::query::SearchHit> = all_results
309 .into_iter()
310 .map(|(segment_id, result)| crate::query::SearchHit {
311 address: crate::query::DocAddress::new(segment_id, result.doc_id),
312 score: result.score,
313 })
314 .collect();
315
316 Ok(crate::query::SearchResponse { hits, total_hits })
317 }
318
319 pub async fn get_document(
321 &self,
322 address: &crate::query::DocAddress,
323 ) -> Result<Option<Document>> {
324 let segment_id = address
325 .segment_id_u128()
326 .ok_or_else(|| Error::Query(format!("Invalid segment ID: {}", address.segment_id)))?;
327
328 let segments = self.segments.read().clone();
329 for segment in &segments {
330 if segment.meta().id == segment_id {
331 return segment.doc(address.doc_id).await;
332 }
333 }
334
335 Ok(None)
336 }
337
338 pub fn default_fields(&self) -> &[crate::Field] {
340 &self.default_fields
341 }
342
343 pub fn set_default_fields(&mut self, fields: Vec<crate::Field>) {
345 self.default_fields = fields;
346 }
347
348 pub fn tokenizers(&self) -> &Arc<crate::tokenizer::TokenizerRegistry> {
350 &self.tokenizers
351 }
352
353 pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
358 let query_routers = self.schema.query_routers();
360 if !query_routers.is_empty() {
361 if let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers) {
363 return crate::dsl::QueryLanguageParser::with_router(
364 Arc::clone(&self.schema),
365 self.default_fields.clone(),
366 Arc::clone(&self.tokenizers),
367 router,
368 );
369 }
370 }
371
372 crate::dsl::QueryLanguageParser::new(
374 Arc::clone(&self.schema),
375 self.default_fields.clone(),
376 Arc::clone(&self.tokenizers),
377 )
378 }
379
380 pub async fn query(
386 &self,
387 query_str: &str,
388 limit: usize,
389 ) -> Result<crate::query::SearchResponse> {
390 let parser = self.query_parser();
391 let query = parser.parse(query_str).map_err(Error::Query)?;
392 self.search_with_addresses(query.as_ref(), limit).await
393 }
394}
395
396impl<D: Directory> Index<SliceCachingDirectory<D>> {
398 pub async fn open_with_cache(
403 directory: D,
404 config: IndexConfig,
405 cache_max_bytes: usize,
406 ) -> Result<Self> {
407 let caching_dir = SliceCachingDirectory::new(directory, cache_max_bytes);
408
409 let cache_path = Path::new(SLICE_CACHE_FILENAME);
411 if let Ok(true) = caching_dir.inner().exists(cache_path).await
412 && let Ok(slice) = caching_dir.inner().open_read(cache_path).await
413 && let Ok(bytes) = slice.read_bytes().await
414 {
415 let _ = caching_dir.deserialize(bytes.as_slice());
416 }
417
418 Self::open(caching_dir, config).await
419 }
420
421 #[cfg(feature = "native")]
426 pub async fn save_slice_cache(&self) -> Result<()>
427 where
428 D: DirectoryWriter,
429 {
430 let cache_data = self.directory.serialize();
431 let cache_path = Path::new(SLICE_CACHE_FILENAME);
432 self.directory
433 .inner()
434 .write(cache_path, &cache_data)
435 .await?;
436 Ok(())
437 }
438
439 pub fn slice_cache_stats(&self) -> crate::directories::SliceCacheStats {
441 self.directory.stats()
442 }
443}
444
445#[cfg(feature = "native")]
454pub async fn warmup_and_save_slice_cache<D: DirectoryWriter>(
455 directory: D,
456 config: IndexConfig,
457 cache_max_bytes: usize,
458) -> Result<()> {
459 let caching_dir = SliceCachingDirectory::new(directory, cache_max_bytes);
460 let index = Index::open(caching_dir, config).await?;
461
462 index.save_slice_cache().await?;
468
469 Ok(())
470}
471
472#[cfg(feature = "native")]
473impl<D: Directory> Clone for Index<D> {
474 fn clone(&self) -> Self {
475 Self {
476 directory: Arc::clone(&self.directory),
477 schema: Arc::clone(&self.schema),
478 config: self.config.clone(),
479 segments: RwLock::new(self.segments.read().clone()),
480 default_fields: self.default_fields.clone(),
481 tokenizers: Arc::clone(&self.tokenizers),
482 thread_pool: Arc::clone(&self.thread_pool),
483 }
484 }
485}
486
487#[cfg(feature = "native")]
495pub struct IndexWriter<D: DirectoryWriter> {
496 directory: Arc<D>,
497 schema: Arc<Schema>,
498 config: IndexConfig,
499 builder_config: SegmentBuilderConfig,
500 tokenizers: FxHashMap<Field, BoxedTokenizer>,
501 builders: Vec<AsyncMutex<Option<SegmentBuilder>>>,
503 segment_ids: Arc<AsyncMutex<Vec<String>>>,
505 segment_id_sender: mpsc::UnboundedSender<String>,
507 segment_id_receiver: AsyncMutex<mpsc::UnboundedReceiver<String>>,
509 pending_builds: AtomicUsize,
511}
512
513#[cfg(feature = "native")]
514impl<D: DirectoryWriter> IndexWriter<D> {
515 pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
517 Self::create_with_config(directory, schema, config, SegmentBuilderConfig::default()).await
518 }
519
520 pub async fn create_with_config(
522 directory: D,
523 schema: Schema,
524 config: IndexConfig,
525 builder_config: SegmentBuilderConfig,
526 ) -> Result<Self> {
527 let directory = Arc::new(directory);
528 let schema = Arc::new(schema);
529
530 let schema_bytes =
532 serde_json::to_vec(&*schema).map_err(|e| Error::Serialization(e.to_string()))?;
533 directory
534 .write(Path::new("schema.json"), &schema_bytes)
535 .await?;
536
537 let segments_bytes = serde_json::to_vec(&Vec::<String>::new())
539 .map_err(|e| Error::Serialization(e.to_string()))?;
540 directory
541 .write(Path::new("segments.json"), &segments_bytes)
542 .await?;
543
544 let num_builders = config.num_indexing_threads.max(1);
546 let mut builders = Vec::with_capacity(num_builders);
547 for _ in 0..num_builders {
548 builders.push(AsyncMutex::new(None));
549 }
550
551 let (segment_id_sender, segment_id_receiver) = mpsc::unbounded_channel();
553
554 Ok(Self {
555 directory,
556 schema,
557 config,
558 builder_config,
559 tokenizers: FxHashMap::default(),
560 builders,
561 segment_ids: Arc::new(AsyncMutex::new(Vec::new())),
562 segment_id_sender,
563 segment_id_receiver: AsyncMutex::new(segment_id_receiver),
564 pending_builds: AtomicUsize::new(0),
565 })
566 }
567
568 pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
570 Self::open_with_config(directory, config, SegmentBuilderConfig::default()).await
571 }
572
573 pub async fn open_with_config(
575 directory: D,
576 config: IndexConfig,
577 builder_config: SegmentBuilderConfig,
578 ) -> Result<Self> {
579 let directory = Arc::new(directory);
580
581 let schema_slice = directory.open_read(Path::new("schema.json")).await?;
583 let schema_bytes = schema_slice.read_bytes().await?;
584 let schema: Schema = serde_json::from_slice(schema_bytes.as_slice())
585 .map_err(|e| Error::Serialization(e.to_string()))?;
586 let schema = Arc::new(schema);
587
588 let segments_slice = directory.open_read(Path::new("segments.json")).await?;
590 let segments_bytes = segments_slice.read_bytes().await?;
591 let segment_ids: Vec<String> = serde_json::from_slice(segments_bytes.as_slice())
592 .map_err(|e| Error::Serialization(e.to_string()))?;
593
594 let num_builders = config.num_indexing_threads.max(1);
596 let mut builders = Vec::with_capacity(num_builders);
597 for _ in 0..num_builders {
598 builders.push(AsyncMutex::new(None));
599 }
600
601 let (segment_id_sender, segment_id_receiver) = mpsc::unbounded_channel();
603
604 Ok(Self {
605 directory,
606 schema,
607 config,
608 builder_config,
609 tokenizers: FxHashMap::default(),
610 builders,
611 segment_ids: Arc::new(AsyncMutex::new(segment_ids)),
612 segment_id_sender,
613 segment_id_receiver: AsyncMutex::new(segment_id_receiver),
614 pending_builds: AtomicUsize::new(0),
615 })
616 }
617
618 pub fn schema(&self) -> &Schema {
620 &self.schema
621 }
622
623 pub fn set_tokenizer<T: crate::tokenizer::Tokenizer>(&mut self, field: Field, tokenizer: T) {
625 self.tokenizers.insert(field, Box::new(tokenizer));
626 }
627
628 pub async fn add_document(&self, doc: Document) -> Result<DocId> {
634 let builder_idx = rand::rng().random_range(0..self.builders.len());
636
637 let mut builder_guard = self.builders[builder_idx].lock().await;
638
639 if builder_guard.is_none() {
641 let mut builder =
642 SegmentBuilder::new((*self.schema).clone(), self.builder_config.clone())?;
643 for (field, tokenizer) in &self.tokenizers {
644 builder.set_tokenizer(*field, tokenizer.clone_box());
645 }
646 *builder_guard = Some(builder);
647 }
648
649 let builder = builder_guard.as_mut().unwrap();
650 let doc_id = builder.add_document(doc)?;
651
652 if builder.num_docs() >= self.config.max_docs_per_segment {
654 let full_builder = builder_guard.take().unwrap();
655 drop(builder_guard); self.spawn_background_build(full_builder);
657 }
658
659 Ok(doc_id)
660 }
661
662 fn spawn_background_build(&self, builder: SegmentBuilder) {
667 let directory = Arc::clone(&self.directory);
668 let segment_id = SegmentId::new();
669 let segment_hex = segment_id.to_hex();
670 let sender = self.segment_id_sender.clone();
671 let segment_ids = Arc::clone(&self.segment_ids);
672
673 self.pending_builds.fetch_add(1, Ordering::SeqCst);
674
675 tokio::spawn(async move {
677 match builder.build(directory.as_ref(), segment_id).await {
678 Ok(_) => {
679 let mut ids = segment_ids.lock().await;
681 ids.push(segment_hex.clone());
682 let _ = sender.send(segment_hex);
684 }
685 Err(e) => {
686 eprintln!("Background segment build failed: {:?}", e);
688 }
689 }
690 });
691 }
692
693 async fn collect_completed_segments(&self) {
695 let mut receiver = self.segment_id_receiver.lock().await;
696 while let Ok(_segment_hex) = receiver.try_recv() {
697 self.pending_builds.fetch_sub(1, Ordering::SeqCst);
699 }
700 }
701
702 pub fn pending_build_count(&self) -> usize {
704 self.pending_builds.load(Ordering::SeqCst)
705 }
706
707 pub async fn get_builder_stats(&self) -> Option<crate::segment::SegmentBuilderStats> {
709 let mut total_stats: Option<crate::segment::SegmentBuilderStats> = None;
710
711 for builder_mutex in &self.builders {
712 let guard = builder_mutex.lock().await;
713 if let Some(builder) = guard.as_ref() {
714 let stats = builder.stats();
715 if let Some(ref mut total) = total_stats {
716 total.num_docs += stats.num_docs;
717 total.unique_terms += stats.unique_terms;
718 total.postings_in_memory += stats.postings_in_memory;
719 total.interned_strings += stats.interned_strings;
720 total.doc_field_lengths_size += stats.doc_field_lengths_size;
721 } else {
722 total_stats = Some(stats);
723 }
724 }
725 }
726
727 total_stats
728 }
729
730 pub async fn flush(&self) -> Result<()> {
736 self.collect_completed_segments().await;
738
739 for builder_mutex in &self.builders {
741 let mut guard = builder_mutex.lock().await;
742 if let Some(builder) = guard.take()
743 && builder.num_docs() > 0
744 {
745 self.spawn_background_build(builder);
746 }
747 }
748
749 Ok(())
750 }
751
752 pub async fn commit(&self) -> Result<()> {
757 self.flush().await?;
759
760 let mut receiver = self.segment_id_receiver.lock().await;
762 while self.pending_builds.load(Ordering::SeqCst) > 0 {
763 match receiver.recv().await {
764 Some(_segment_hex) => {
765 self.pending_builds.fetch_sub(1, Ordering::SeqCst);
766 }
767 None => break, }
769 }
770 drop(receiver);
771
772 let segment_ids = self.segment_ids.lock().await;
774 let segments_bytes =
775 serde_json::to_vec(&*segment_ids).map_err(|e| Error::Serialization(e.to_string()))?;
776 self.directory
777 .write(Path::new("segments.json"), &segments_bytes)
778 .await?;
779
780 Ok(())
781 }
782
783 async fn do_merge(&self) -> Result<()> {
785 let segment_ids = self.segment_ids.lock().await;
786
787 if segment_ids.len() < 2 {
788 return Ok(());
789 }
790
791 let ids_to_merge: Vec<String> = segment_ids.clone();
792 drop(segment_ids);
793
794 let mut readers = Vec::new();
796 let mut doc_offset = 0u32;
797
798 for id_str in &ids_to_merge {
799 let segment_id = SegmentId::from_hex(id_str)
800 .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
801 let reader = SegmentReader::open(
802 self.directory.as_ref(),
803 segment_id,
804 Arc::clone(&self.schema),
805 doc_offset,
806 self.config.term_cache_blocks,
807 )
808 .await?;
809 doc_offset += reader.meta().num_docs;
810 readers.push(reader);
811 }
812
813 let merger = SegmentMerger::new(Arc::clone(&self.schema));
815 let new_segment_id = SegmentId::new();
816 merger
817 .merge(self.directory.as_ref(), &readers, new_segment_id)
818 .await?;
819
820 let mut segment_ids = self.segment_ids.lock().await;
822 segment_ids.clear();
823 segment_ids.push(new_segment_id.to_hex());
824
825 let segments_bytes =
826 serde_json::to_vec(&*segment_ids).map_err(|e| Error::Serialization(e.to_string()))?;
827 self.directory
828 .write(Path::new("segments.json"), &segments_bytes)
829 .await?;
830
831 for id_str in ids_to_merge {
833 if let Some(segment_id) = SegmentId::from_hex(&id_str) {
834 let _ = crate::segment::delete_segment(self.directory.as_ref(), segment_id).await;
835 }
836 }
837
838 Ok(())
839 }
840
841 pub async fn force_merge(&self) -> Result<()> {
843 self.commit().await?;
845 self.do_merge().await
847 }
848}
849
850#[cfg(test)]
851mod tests {
852 use super::*;
853 use crate::directories::RamDirectory;
854 use crate::dsl::SchemaBuilder;
855
856 #[tokio::test]
857 async fn test_index_create_and_search() {
858 let mut schema_builder = SchemaBuilder::default();
859 let title = schema_builder.add_text_field("title", true, true);
860 let body = schema_builder.add_text_field("body", true, true);
861 let schema = schema_builder.build();
862
863 let dir = RamDirectory::new();
864 let config = IndexConfig::default();
865
866 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
868 .await
869 .unwrap();
870
871 let mut doc1 = Document::new();
872 doc1.add_text(title, "Hello World");
873 doc1.add_text(body, "This is the first document");
874 writer.add_document(doc1).await.unwrap();
875
876 let mut doc2 = Document::new();
877 doc2.add_text(title, "Goodbye World");
878 doc2.add_text(body, "This is the second document");
879 writer.add_document(doc2).await.unwrap();
880
881 writer.commit().await.unwrap();
882
883 let index = Index::open(dir, config).await.unwrap();
885 assert_eq!(index.num_docs(), 2);
886
887 let postings = index.get_postings(title, b"world").await.unwrap();
889 assert_eq!(postings.len(), 1); assert_eq!(postings[0].1.doc_count(), 2); let doc = index.doc(0).await.unwrap().unwrap();
894 assert_eq!(doc.get_first(title).unwrap().as_text(), Some("Hello World"));
895 }
896
897 #[tokio::test]
898 async fn test_multiple_segments() {
899 let mut schema_builder = SchemaBuilder::default();
900 let title = schema_builder.add_text_field("title", true, true);
901 let schema = schema_builder.build();
902
903 let dir = RamDirectory::new();
904 let config = IndexConfig {
905 max_docs_per_segment: 5, ..Default::default()
907 };
908
909 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
910 .await
911 .unwrap();
912
913 for batch in 0..3 {
915 for i in 0..5 {
916 let mut doc = Document::new();
917 doc.add_text(title, format!("Document {} batch {}", i, batch));
918 writer.add_document(doc).await.unwrap();
919 }
920 writer.commit().await.unwrap();
921 }
922
923 let index = Index::open(dir, config).await.unwrap();
925 assert_eq!(index.num_docs(), 15);
926 assert_eq!(index.segment_readers().len(), 3);
927 }
928
929 #[tokio::test]
930 async fn test_segment_merge() {
931 let mut schema_builder = SchemaBuilder::default();
932 let title = schema_builder.add_text_field("title", true, true);
933 let schema = schema_builder.build();
934
935 let dir = RamDirectory::new();
936 let config = IndexConfig {
937 max_docs_per_segment: 3,
938 ..Default::default()
939 };
940
941 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
942 .await
943 .unwrap();
944
945 for i in 0..9 {
947 let mut doc = Document::new();
948 doc.add_text(title, format!("Document {}", i));
949 writer.add_document(doc).await.unwrap();
950 }
951 writer.commit().await.unwrap();
952
953 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
955 assert_eq!(index.segment_readers().len(), 3);
956
957 let writer = IndexWriter::open(dir.clone(), config.clone())
959 .await
960 .unwrap();
961 writer.force_merge().await.unwrap();
962
963 let index = Index::open(dir, config).await.unwrap();
965 assert_eq!(index.segment_readers().len(), 1);
966 assert_eq!(index.num_docs(), 9);
967
968 for i in 0..9 {
970 let doc = index.doc(i).await.unwrap().unwrap();
971 assert_eq!(
972 doc.get_first(title).unwrap().as_text(),
973 Some(format!("Document {}", i).as_str())
974 );
975 }
976 }
977
978 #[tokio::test]
979 async fn test_match_query() {
980 let mut schema_builder = SchemaBuilder::default();
981 let title = schema_builder.add_text_field("title", true, true);
982 let body = schema_builder.add_text_field("body", true, true);
983 let schema = schema_builder.build();
984
985 let dir = RamDirectory::new();
986 let config = IndexConfig::default();
987
988 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
989 .await
990 .unwrap();
991
992 let mut doc1 = Document::new();
993 doc1.add_text(title, "rust programming");
994 doc1.add_text(body, "Learn rust language");
995 writer.add_document(doc1).await.unwrap();
996
997 let mut doc2 = Document::new();
998 doc2.add_text(title, "python programming");
999 doc2.add_text(body, "Learn python language");
1000 writer.add_document(doc2).await.unwrap();
1001
1002 writer.commit().await.unwrap();
1003
1004 let index = Index::open(dir, config).await.unwrap();
1005
1006 let results = index.query("rust", 10).await.unwrap();
1008 assert_eq!(results.hits.len(), 1);
1009
1010 let results = index.query("rust programming", 10).await.unwrap();
1012 assert!(!results.hits.is_empty());
1013
1014 let hit = &results.hits[0];
1016 assert!(!hit.address.segment_id.is_empty(), "Should have segment_id");
1017
1018 let doc = index.get_document(&hit.address).await.unwrap().unwrap();
1020 assert!(
1021 !doc.field_values().is_empty(),
1022 "Doc should have field values"
1023 );
1024
1025 let doc = index.doc(0).await.unwrap().unwrap();
1027 assert!(
1028 !doc.field_values().is_empty(),
1029 "Doc should have field values"
1030 );
1031 }
1032
1033 #[tokio::test]
1034 async fn test_slice_cache_warmup_and_load() {
1035 use crate::directories::SliceCachingDirectory;
1036
1037 let mut schema_builder = SchemaBuilder::default();
1038 let title = schema_builder.add_text_field("title", true, true);
1039 let body = schema_builder.add_text_field("body", true, true);
1040 let schema = schema_builder.build();
1041
1042 let dir = RamDirectory::new();
1043 let config = IndexConfig::default();
1044
1045 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1047 .await
1048 .unwrap();
1049
1050 for i in 0..10 {
1051 let mut doc = Document::new();
1052 doc.add_text(title, format!("Document {} about rust", i));
1053 doc.add_text(body, format!("This is body text number {}", i));
1054 writer.add_document(doc).await.unwrap();
1055 }
1056 writer.commit().await.unwrap();
1057
1058 let caching_dir = SliceCachingDirectory::new(dir.clone(), 1024 * 1024);
1060 let index = Index::open(caching_dir, config.clone()).await.unwrap();
1061
1062 let results = index.query("rust", 10).await.unwrap();
1064 assert!(!results.hits.is_empty());
1065
1066 let stats = index.slice_cache_stats();
1068 assert!(stats.total_bytes > 0, "Cache should have data after search");
1069
1070 index.save_slice_cache().await.unwrap();
1072
1073 assert!(
1075 dir.exists(Path::new(super::SLICE_CACHE_FILENAME))
1076 .await
1077 .unwrap()
1078 );
1079
1080 let index2 = Index::open_with_cache(dir.clone(), config.clone(), 1024 * 1024)
1082 .await
1083 .unwrap();
1084
1085 let stats2 = index2.slice_cache_stats();
1087 assert!(
1088 stats2.total_bytes > 0,
1089 "Cache should be prefilled from file"
1090 );
1091
1092 let results2 = index2.query("rust", 10).await.unwrap();
1094 assert_eq!(results.hits.len(), results2.hits.len());
1095 }
1096
1097 #[tokio::test]
1098 async fn test_multivalue_field_indexing_and_search() {
1099 let mut schema_builder = SchemaBuilder::default();
1100 let uris = schema_builder.add_text_field("uris", true, true);
1101 let title = schema_builder.add_text_field("title", true, true);
1102 let schema = schema_builder.build();
1103
1104 let dir = RamDirectory::new();
1105 let config = IndexConfig::default();
1106
1107 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1109 .await
1110 .unwrap();
1111
1112 let mut doc = Document::new();
1113 doc.add_text(uris, "one");
1114 doc.add_text(uris, "two");
1115 doc.add_text(title, "Test Document");
1116 writer.add_document(doc).await.unwrap();
1117
1118 let mut doc2 = Document::new();
1120 doc2.add_text(uris, "three");
1121 doc2.add_text(title, "Another Document");
1122 writer.add_document(doc2).await.unwrap();
1123
1124 writer.commit().await.unwrap();
1125
1126 let index = Index::open(dir, config).await.unwrap();
1128 assert_eq!(index.num_docs(), 2);
1129
1130 let doc = index.doc(0).await.unwrap().unwrap();
1132 let all_uris: Vec<_> = doc.get_all(uris).collect();
1133 assert_eq!(all_uris.len(), 2, "Should have 2 uris values");
1134 assert_eq!(all_uris[0].as_text(), Some("one"));
1135 assert_eq!(all_uris[1].as_text(), Some("two"));
1136
1137 let json = doc.to_json(index.schema());
1139 let uris_json = json.get("uris").unwrap();
1140 assert!(uris_json.is_array(), "Multi-value field should be an array");
1141 let uris_arr = uris_json.as_array().unwrap();
1142 assert_eq!(uris_arr.len(), 2);
1143 assert_eq!(uris_arr[0].as_str(), Some("one"));
1144 assert_eq!(uris_arr[1].as_str(), Some("two"));
1145
1146 let results = index.query("uris:one", 10).await.unwrap();
1148 assert_eq!(results.hits.len(), 1, "Should find doc with 'one'");
1149 assert_eq!(results.hits[0].address.doc_id, 0);
1150
1151 let results = index.query("uris:two", 10).await.unwrap();
1152 assert_eq!(results.hits.len(), 1, "Should find doc with 'two'");
1153 assert_eq!(results.hits[0].address.doc_id, 0);
1154
1155 let results = index.query("uris:three", 10).await.unwrap();
1156 assert_eq!(results.hits.len(), 1, "Should find doc with 'three'");
1157 assert_eq!(results.hits[0].address.doc_id, 1);
1158
1159 let results = index.query("uris:nonexistent", 10).await.unwrap();
1161 assert_eq!(results.hits.len(), 0, "Should not find non-existent value");
1162 }
1163}