1use std::path::Path;
9use std::sync::Arc;
10
11use parking_lot::RwLock;
12#[cfg(feature = "native")]
13use rustc_hash::FxHashMap;
14
15use crate::DocId;
16#[cfg(feature = "native")]
17use crate::directories::DirectoryWriter;
18use crate::directories::{Directory, SliceCachingDirectory};
19use crate::dsl::{Document, Field, Schema};
20use crate::error::{Error, Result};
21#[cfg(feature = "native")]
22use crate::segment::{SegmentBuilder, SegmentBuilderConfig, SegmentMerger};
23use crate::segment::{SegmentId, SegmentReader};
24use crate::structures::BlockPostingList;
25#[cfg(feature = "native")]
26use crate::tokenizer::BoxedTokenizer;
27
28#[cfg(feature = "native")]
29use rand::Rng;
30#[cfg(feature = "native")]
31use tokio::sync::Mutex as AsyncMutex;
32#[cfg(feature = "native")]
33use tokio::task::JoinHandle;
34
35pub const SLICE_CACHE_FILENAME: &str = "index.slicecache";
37
38#[derive(Debug, Clone)]
40pub struct IndexConfig {
41 pub num_threads: usize,
43 pub num_indexing_threads: usize,
45 pub num_compression_threads: usize,
47 pub term_cache_blocks: usize,
49 pub store_cache_blocks: usize,
51 pub max_docs_per_segment: u32,
53 pub optimization: crate::structures::IndexOptimization,
55}
56
57impl Default for IndexConfig {
58 fn default() -> Self {
59 #[cfg(feature = "native")]
60 let cpus = num_cpus::get().max(1);
61 #[cfg(not(feature = "native"))]
62 let cpus = 1;
63
64 Self {
65 num_threads: cpus,
66 num_indexing_threads: 1,
67 num_compression_threads: cpus,
68 term_cache_blocks: 256,
69 store_cache_blocks: 32,
70 max_docs_per_segment: 100_000,
71 optimization: crate::structures::IndexOptimization::default(),
72 }
73 }
74}
75
76pub struct Index<D: Directory> {
81 directory: Arc<D>,
82 schema: Arc<Schema>,
83 config: IndexConfig,
84 segments: RwLock<Vec<Arc<SegmentReader>>>,
85 default_fields: Vec<crate::Field>,
86 tokenizers: Arc<crate::tokenizer::TokenizerRegistry>,
87 #[cfg(feature = "native")]
88 thread_pool: Arc<rayon::ThreadPool>,
89}
90
91impl<D: Directory> Index<D> {
92 pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
94 let directory = Arc::new(directory);
95
96 let schema_slice = directory.open_read(Path::new("schema.json")).await?;
98 let schema_bytes = schema_slice.read_bytes().await?;
99 let schema: Schema = serde_json::from_slice(schema_bytes.as_slice())
100 .map_err(|e| Error::Serialization(e.to_string()))?;
101 let schema = Arc::new(schema);
102
103 let segments = Self::load_segments(&directory, &schema, &config).await?;
105
106 #[cfg(feature = "native")]
107 let thread_pool = {
108 let pool = rayon::ThreadPoolBuilder::new()
109 .num_threads(config.num_threads)
110 .build()
111 .map_err(|e| Error::Io(std::io::Error::other(e)))?;
112 Arc::new(pool)
113 };
114
115 let default_fields: Vec<crate::Field> = if !schema.default_fields().is_empty() {
117 schema.default_fields().to_vec()
118 } else {
119 schema
120 .fields()
121 .filter(|(_, entry)| {
122 entry.indexed && entry.field_type == crate::dsl::FieldType::Text
123 })
124 .map(|(field, _)| field)
125 .collect()
126 };
127
128 Ok(Self {
129 directory,
130 schema,
131 config,
132 segments: RwLock::new(segments),
133 default_fields,
134 tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
135 #[cfg(feature = "native")]
136 thread_pool,
137 })
138 }
139
140 async fn load_segments(
141 directory: &Arc<D>,
142 schema: &Arc<Schema>,
143 config: &IndexConfig,
144 ) -> Result<Vec<Arc<SegmentReader>>> {
145 let segments_path = Path::new("segments.json");
147 if !directory.exists(segments_path).await? {
148 return Ok(Vec::new());
149 }
150
151 let segments_slice = directory.open_read(segments_path).await?;
152 let segments_bytes = segments_slice.read_bytes().await?;
153 let segment_ids: Vec<String> = serde_json::from_slice(segments_bytes.as_slice())
154 .map_err(|e| Error::Serialization(e.to_string()))?;
155
156 let mut segments = Vec::new();
157 let mut doc_id_offset = 0u32;
158
159 for id_str in segment_ids {
160 let segment_id = SegmentId::from_hex(&id_str)
161 .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
162 let reader = SegmentReader::open(
163 directory.as_ref(),
164 segment_id,
165 Arc::clone(schema),
166 doc_id_offset,
167 config.term_cache_blocks,
168 )
169 .await?;
170
171 doc_id_offset += reader.meta().num_docs;
172 segments.push(Arc::new(reader));
173 }
174
175 Ok(segments)
176 }
177
178 pub fn schema(&self) -> &Schema {
180 &self.schema
181 }
182
183 pub fn directory(&self) -> &D {
185 &self.directory
186 }
187
188 pub fn num_docs(&self) -> u32 {
190 self.segments.read().iter().map(|s| s.num_docs()).sum()
191 }
192
193 pub async fn doc(&self, doc_id: DocId) -> Result<Option<Document>> {
195 let segments = self.segments.read().clone();
196
197 let mut offset = 0u32;
198 for segment in segments.iter() {
199 let segment_docs = segment.meta().num_docs;
200 if doc_id < offset + segment_docs {
201 let local_doc_id = doc_id - offset;
202 return segment.doc(local_doc_id).await;
203 }
204 offset += segment_docs;
205 }
206
207 Ok(None)
208 }
209
210 pub async fn get_postings(
212 &self,
213 field: Field,
214 term: &[u8],
215 ) -> Result<Vec<(Arc<SegmentReader>, BlockPostingList)>> {
216 let segments = self.segments.read().clone();
217 let mut results = Vec::new();
218
219 for segment in segments.iter() {
220 if let Some(postings) = segment.get_postings(field, term).await? {
221 results.push((Arc::clone(segment), postings));
222 }
223 }
224
225 Ok(results)
226 }
227
228 #[cfg(feature = "native")]
230 pub async fn spawn_blocking<F, R>(&self, f: F) -> R
231 where
232 F: FnOnce() -> R + Send + 'static,
233 R: Send + 'static,
234 {
235 let (tx, rx) = tokio::sync::oneshot::channel();
236 self.thread_pool.spawn(move || {
237 let result = f();
238 let _ = tx.send(result);
239 });
240 rx.await.expect("Thread pool task panicked")
241 }
242
243 pub fn segment_readers(&self) -> Vec<Arc<SegmentReader>> {
245 self.segments.read().clone()
246 }
247
248 pub async fn reload(&self) -> Result<()> {
250 let new_segments = Self::load_segments(&self.directory, &self.schema, &self.config).await?;
251 *self.segments.write() = new_segments;
252 Ok(())
253 }
254
255 pub async fn search(
257 &self,
258 query: &dyn crate::query::Query,
259 limit: usize,
260 ) -> Result<Vec<crate::query::SearchResult>> {
261 let segments = self.segments.read().clone();
262 let mut all_results = Vec::new();
263
264 for segment in &segments {
265 let results = crate::query::search_segment(segment.as_ref(), query, limit).await?;
266 all_results.extend(results);
267 }
268
269 all_results.sort_by(|a, b| {
271 b.score
272 .partial_cmp(&a.score)
273 .unwrap_or(std::cmp::Ordering::Equal)
274 });
275 all_results.truncate(limit);
276
277 Ok(all_results)
278 }
279
280 pub async fn search_with_addresses(
282 &self,
283 query: &dyn crate::query::Query,
284 limit: usize,
285 ) -> Result<crate::query::SearchResponse> {
286 let segments = self.segments.read().clone();
287 let mut all_results: Vec<(u128, crate::query::SearchResult)> = Vec::new();
288
289 for segment in &segments {
290 let segment_id = segment.meta().id;
291 let results = crate::query::search_segment(segment.as_ref(), query, limit).await?;
292 for result in results {
293 all_results.push((segment_id, result));
294 }
295 }
296
297 all_results.sort_by(|a, b| {
299 b.1.score
300 .partial_cmp(&a.1.score)
301 .unwrap_or(std::cmp::Ordering::Equal)
302 });
303 all_results.truncate(limit);
304
305 let total_hits = all_results.len() as u32;
306 let hits: Vec<crate::query::SearchHit> = all_results
307 .into_iter()
308 .map(|(segment_id, result)| crate::query::SearchHit {
309 address: crate::query::DocAddress::new(segment_id, result.doc_id),
310 score: result.score,
311 })
312 .collect();
313
314 Ok(crate::query::SearchResponse { hits, total_hits })
315 }
316
317 pub async fn get_document(
319 &self,
320 address: &crate::query::DocAddress,
321 ) -> Result<Option<Document>> {
322 let segment_id = address
323 .segment_id_u128()
324 .ok_or_else(|| Error::Query(format!("Invalid segment ID: {}", address.segment_id)))?;
325
326 let segments = self.segments.read().clone();
327 for segment in &segments {
328 if segment.meta().id == segment_id {
329 return segment.doc(address.doc_id).await;
330 }
331 }
332
333 Ok(None)
334 }
335
336 pub fn default_fields(&self) -> &[crate::Field] {
338 &self.default_fields
339 }
340
341 pub fn set_default_fields(&mut self, fields: Vec<crate::Field>) {
343 self.default_fields = fields;
344 }
345
346 pub fn tokenizers(&self) -> &Arc<crate::tokenizer::TokenizerRegistry> {
348 &self.tokenizers
349 }
350
351 pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
356 let query_routers = self.schema.query_routers();
358 if !query_routers.is_empty() {
359 if let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers) {
361 return crate::dsl::QueryLanguageParser::with_router(
362 Arc::clone(&self.schema),
363 self.default_fields.clone(),
364 Arc::clone(&self.tokenizers),
365 router,
366 );
367 }
368 }
369
370 crate::dsl::QueryLanguageParser::new(
372 Arc::clone(&self.schema),
373 self.default_fields.clone(),
374 Arc::clone(&self.tokenizers),
375 )
376 }
377
378 pub async fn query(
384 &self,
385 query_str: &str,
386 limit: usize,
387 ) -> Result<crate::query::SearchResponse> {
388 let parser = self.query_parser();
389 let query = parser.parse(query_str).map_err(Error::Query)?;
390 self.search_with_addresses(query.as_ref(), limit).await
391 }
392}
393
394impl<D: Directory> Index<SliceCachingDirectory<D>> {
396 pub async fn open_with_cache(
401 directory: D,
402 config: IndexConfig,
403 cache_max_bytes: usize,
404 ) -> Result<Self> {
405 let caching_dir = SliceCachingDirectory::new(directory, cache_max_bytes);
406
407 let cache_path = Path::new(SLICE_CACHE_FILENAME);
409 if let Ok(true) = caching_dir.inner().exists(cache_path).await
410 && let Ok(slice) = caching_dir.inner().open_read(cache_path).await
411 && let Ok(bytes) = slice.read_bytes().await
412 {
413 let _ = caching_dir.deserialize(bytes.as_slice());
414 }
415
416 Self::open(caching_dir, config).await
417 }
418
419 #[cfg(feature = "native")]
424 pub async fn save_slice_cache(&self) -> Result<()>
425 where
426 D: DirectoryWriter,
427 {
428 let cache_data = self.directory.serialize();
429 let cache_path = Path::new(SLICE_CACHE_FILENAME);
430 self.directory
431 .inner()
432 .write(cache_path, &cache_data)
433 .await?;
434 Ok(())
435 }
436
437 pub fn slice_cache_stats(&self) -> crate::directories::SliceCacheStats {
439 self.directory.stats()
440 }
441}
442
443#[cfg(feature = "native")]
452pub async fn warmup_and_save_slice_cache<D: DirectoryWriter>(
453 directory: D,
454 config: IndexConfig,
455 cache_max_bytes: usize,
456) -> Result<()> {
457 let caching_dir = SliceCachingDirectory::new(directory, cache_max_bytes);
458 let index = Index::open(caching_dir, config).await?;
459
460 index.save_slice_cache().await?;
466
467 Ok(())
468}
469
470#[cfg(feature = "native")]
471impl<D: Directory> Clone for Index<D> {
472 fn clone(&self) -> Self {
473 Self {
474 directory: Arc::clone(&self.directory),
475 schema: Arc::clone(&self.schema),
476 config: self.config.clone(),
477 segments: RwLock::new(self.segments.read().clone()),
478 default_fields: self.default_fields.clone(),
479 tokenizers: Arc::clone(&self.tokenizers),
480 thread_pool: Arc::clone(&self.thread_pool),
481 }
482 }
483}
484
485#[cfg(feature = "native")]
493pub struct IndexWriter<D: DirectoryWriter> {
494 directory: Arc<D>,
495 schema: Arc<Schema>,
496 config: IndexConfig,
497 builder_config: SegmentBuilderConfig,
498 tokenizers: FxHashMap<Field, BoxedTokenizer>,
499 builders: Vec<AsyncMutex<Option<SegmentBuilder>>>,
501 segment_ids: AsyncMutex<Vec<String>>,
503 background_builds: AsyncMutex<Vec<JoinHandle<Result<String>>>>,
505}
506
507#[cfg(feature = "native")]
508impl<D: DirectoryWriter> IndexWriter<D> {
509 pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
511 Self::create_with_config(directory, schema, config, SegmentBuilderConfig::default()).await
512 }
513
514 pub async fn create_with_config(
516 directory: D,
517 schema: Schema,
518 config: IndexConfig,
519 builder_config: SegmentBuilderConfig,
520 ) -> Result<Self> {
521 let directory = Arc::new(directory);
522 let schema = Arc::new(schema);
523
524 let schema_bytes =
526 serde_json::to_vec(&*schema).map_err(|e| Error::Serialization(e.to_string()))?;
527 directory
528 .write(Path::new("schema.json"), &schema_bytes)
529 .await?;
530
531 let segments_bytes = serde_json::to_vec(&Vec::<String>::new())
533 .map_err(|e| Error::Serialization(e.to_string()))?;
534 directory
535 .write(Path::new("segments.json"), &segments_bytes)
536 .await?;
537
538 let num_builders = config.num_indexing_threads.max(1);
540 let mut builders = Vec::with_capacity(num_builders);
541 for _ in 0..num_builders {
542 builders.push(AsyncMutex::new(None));
543 }
544
545 Ok(Self {
546 directory,
547 schema,
548 config,
549 builder_config,
550 tokenizers: FxHashMap::default(),
551 builders,
552 segment_ids: AsyncMutex::new(Vec::new()),
553 background_builds: AsyncMutex::new(Vec::new()),
554 })
555 }
556
557 pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
559 Self::open_with_config(directory, config, SegmentBuilderConfig::default()).await
560 }
561
562 pub async fn open_with_config(
564 directory: D,
565 config: IndexConfig,
566 builder_config: SegmentBuilderConfig,
567 ) -> Result<Self> {
568 let directory = Arc::new(directory);
569
570 let schema_slice = directory.open_read(Path::new("schema.json")).await?;
572 let schema_bytes = schema_slice.read_bytes().await?;
573 let schema: Schema = serde_json::from_slice(schema_bytes.as_slice())
574 .map_err(|e| Error::Serialization(e.to_string()))?;
575 let schema = Arc::new(schema);
576
577 let segments_slice = directory.open_read(Path::new("segments.json")).await?;
579 let segments_bytes = segments_slice.read_bytes().await?;
580 let segment_ids: Vec<String> = serde_json::from_slice(segments_bytes.as_slice())
581 .map_err(|e| Error::Serialization(e.to_string()))?;
582
583 let num_builders = config.num_indexing_threads.max(1);
585 let mut builders = Vec::with_capacity(num_builders);
586 for _ in 0..num_builders {
587 builders.push(AsyncMutex::new(None));
588 }
589
590 Ok(Self {
591 directory,
592 schema,
593 config,
594 builder_config,
595 tokenizers: FxHashMap::default(),
596 builders,
597 segment_ids: AsyncMutex::new(segment_ids),
598 background_builds: AsyncMutex::new(Vec::new()),
599 })
600 }
601
602 pub fn schema(&self) -> &Schema {
604 &self.schema
605 }
606
607 pub fn set_tokenizer<T: crate::tokenizer::Tokenizer>(&mut self, field: Field, tokenizer: T) {
609 self.tokenizers.insert(field, Box::new(tokenizer));
610 }
611
612 pub async fn add_document(&self, doc: Document) -> Result<DocId> {
618 let builder_idx = rand::rng().random_range(0..self.builders.len());
620
621 let mut builder_guard = self.builders[builder_idx].lock().await;
622
623 if builder_guard.is_none() {
625 let mut builder =
626 SegmentBuilder::new((*self.schema).clone(), self.builder_config.clone())?;
627 for (field, tokenizer) in &self.tokenizers {
628 builder.set_tokenizer(*field, tokenizer.clone_box());
629 }
630 *builder_guard = Some(builder);
631 }
632
633 let builder = builder_guard.as_mut().unwrap();
634 let doc_id = builder.add_document(doc)?;
635
636 if builder.num_docs() >= self.config.max_docs_per_segment {
638 let full_builder = builder_guard.take().unwrap();
639 drop(builder_guard); self.spawn_background_build(full_builder).await;
641 }
642
643 Ok(doc_id)
644 }
645
646 async fn spawn_background_build(&self, builder: SegmentBuilder) {
648 let directory = Arc::clone(&self.directory);
649 let segment_id = SegmentId::new();
650 let segment_hex = segment_id.to_hex();
651
652 let handle = tokio::spawn(async move {
653 builder.build(directory.as_ref(), segment_id).await?;
654 Ok(segment_hex)
655 });
656
657 let mut builds = self.background_builds.lock().await;
658 builds.push(handle);
659 }
660
661 async fn wait_for_background_builds(&self) -> Result<()> {
663 let mut builds = self.background_builds.lock().await;
664 let mut completed_ids = Vec::new();
665
666 for handle in builds.drain(..) {
667 match handle.await {
668 Ok(Ok(segment_hex)) => completed_ids.push(segment_hex),
669 Ok(Err(e)) => return Err(e),
670 Err(e) => return Err(Error::Io(std::io::Error::other(e))),
671 }
672 }
673 drop(builds);
674
675 if !completed_ids.is_empty() {
677 let mut segment_ids = self.segment_ids.lock().await;
678 segment_ids.extend(completed_ids);
679
680 let segments_bytes = serde_json::to_vec(&*segment_ids)
681 .map_err(|e| Error::Serialization(e.to_string()))?;
682 self.directory
683 .write(Path::new("segments.json"), &segments_bytes)
684 .await?;
685 }
686
687 Ok(())
688 }
689
690 pub async fn get_builder_stats(&self) -> Option<crate::segment::SegmentBuilderStats> {
692 let mut total_stats: Option<crate::segment::SegmentBuilderStats> = None;
693
694 for builder_mutex in &self.builders {
695 let guard = builder_mutex.lock().await;
696 if let Some(builder) = guard.as_ref() {
697 let stats = builder.stats();
698 if let Some(ref mut total) = total_stats {
699 total.num_docs += stats.num_docs;
700 total.unique_terms += stats.unique_terms;
701 total.postings_in_memory += stats.postings_in_memory;
702 total.interned_strings += stats.interned_strings;
703 total.doc_field_lengths_size += stats.doc_field_lengths_size;
704 } else {
705 total_stats = Some(stats);
706 }
707 }
708 }
709
710 total_stats
711 }
712
713 pub async fn commit(&self) -> Result<()> {
718 self.wait_for_background_builds().await?;
720
721 let mut builders_to_commit = Vec::new();
723
724 for builder_mutex in &self.builders {
725 let mut guard = builder_mutex.lock().await;
726 if let Some(builder) = guard.take()
727 && builder.num_docs() > 0
728 {
729 builders_to_commit.push(builder);
730 }
731 }
732
733 let mut handles = Vec::new();
735 for builder in builders_to_commit {
736 let directory = Arc::clone(&self.directory);
737 let segment_id = SegmentId::new();
738 let segment_hex = segment_id.to_hex();
739
740 let handle = tokio::spawn(async move {
741 builder.build(directory.as_ref(), segment_id).await?;
742 Ok::<_, Error>(segment_hex)
743 });
744 handles.push(handle);
745 }
746
747 let mut new_segment_ids = Vec::new();
749 for handle in handles {
750 match handle.await {
751 Ok(Ok(segment_hex)) => new_segment_ids.push(segment_hex),
752 Ok(Err(e)) => return Err(e),
753 Err(e) => return Err(Error::Io(std::io::Error::other(e))),
754 }
755 }
756
757 if !new_segment_ids.is_empty() {
759 let mut segment_ids = self.segment_ids.lock().await;
760 segment_ids.extend(new_segment_ids);
761
762 let segments_bytes = serde_json::to_vec(&*segment_ids)
763 .map_err(|e| Error::Serialization(e.to_string()))?;
764 self.directory
765 .write(Path::new("segments.json"), &segments_bytes)
766 .await?;
767 }
768
769 Ok(())
770 }
771
772 async fn do_merge(&self) -> Result<()> {
774 let segment_ids = self.segment_ids.lock().await;
775
776 if segment_ids.len() < 2 {
777 return Ok(());
778 }
779
780 let ids_to_merge: Vec<String> = segment_ids.clone();
781 drop(segment_ids);
782
783 let mut readers = Vec::new();
785 let mut doc_offset = 0u32;
786
787 for id_str in &ids_to_merge {
788 let segment_id = SegmentId::from_hex(id_str)
789 .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
790 let reader = SegmentReader::open(
791 self.directory.as_ref(),
792 segment_id,
793 Arc::clone(&self.schema),
794 doc_offset,
795 self.config.term_cache_blocks,
796 )
797 .await?;
798 doc_offset += reader.meta().num_docs;
799 readers.push(reader);
800 }
801
802 let merger = SegmentMerger::new(Arc::clone(&self.schema));
804 let new_segment_id = SegmentId::new();
805 merger
806 .merge(self.directory.as_ref(), &readers, new_segment_id)
807 .await?;
808
809 let mut segment_ids = self.segment_ids.lock().await;
811 segment_ids.clear();
812 segment_ids.push(new_segment_id.to_hex());
813
814 let segments_bytes =
815 serde_json::to_vec(&*segment_ids).map_err(|e| Error::Serialization(e.to_string()))?;
816 self.directory
817 .write(Path::new("segments.json"), &segments_bytes)
818 .await?;
819
820 for id_str in ids_to_merge {
822 if let Some(segment_id) = SegmentId::from_hex(&id_str) {
823 let _ = crate::segment::delete_segment(self.directory.as_ref(), segment_id).await;
824 }
825 }
826
827 Ok(())
828 }
829
830 pub async fn force_merge(&self) -> Result<()> {
832 self.commit().await?;
834 self.do_merge().await
836 }
837}
838
839#[cfg(test)]
840mod tests {
841 use super::*;
842 use crate::directories::RamDirectory;
843 use crate::dsl::SchemaBuilder;
844
845 #[tokio::test]
846 async fn test_index_create_and_search() {
847 let mut schema_builder = SchemaBuilder::default();
848 let title = schema_builder.add_text_field("title", true, true);
849 let body = schema_builder.add_text_field("body", true, true);
850 let schema = schema_builder.build();
851
852 let dir = RamDirectory::new();
853 let config = IndexConfig::default();
854
855 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
857 .await
858 .unwrap();
859
860 let mut doc1 = Document::new();
861 doc1.add_text(title, "Hello World");
862 doc1.add_text(body, "This is the first document");
863 writer.add_document(doc1).await.unwrap();
864
865 let mut doc2 = Document::new();
866 doc2.add_text(title, "Goodbye World");
867 doc2.add_text(body, "This is the second document");
868 writer.add_document(doc2).await.unwrap();
869
870 writer.commit().await.unwrap();
871
872 let index = Index::open(dir, config).await.unwrap();
874 assert_eq!(index.num_docs(), 2);
875
876 let postings = index.get_postings(title, b"world").await.unwrap();
878 assert_eq!(postings.len(), 1); assert_eq!(postings[0].1.doc_count(), 2); let doc = index.doc(0).await.unwrap().unwrap();
883 assert_eq!(doc.get_first(title).unwrap().as_text(), Some("Hello World"));
884 }
885
886 #[tokio::test]
887 async fn test_multiple_segments() {
888 let mut schema_builder = SchemaBuilder::default();
889 let title = schema_builder.add_text_field("title", true, true);
890 let schema = schema_builder.build();
891
892 let dir = RamDirectory::new();
893 let config = IndexConfig {
894 max_docs_per_segment: 5, ..Default::default()
896 };
897
898 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
899 .await
900 .unwrap();
901
902 for batch in 0..3 {
904 for i in 0..5 {
905 let mut doc = Document::new();
906 doc.add_text(title, format!("Document {} batch {}", i, batch));
907 writer.add_document(doc).await.unwrap();
908 }
909 writer.commit().await.unwrap();
910 }
911
912 let index = Index::open(dir, config).await.unwrap();
914 assert_eq!(index.num_docs(), 15);
915 assert_eq!(index.segment_readers().len(), 3);
916 }
917
918 #[tokio::test]
919 async fn test_segment_merge() {
920 let mut schema_builder = SchemaBuilder::default();
921 let title = schema_builder.add_text_field("title", true, true);
922 let schema = schema_builder.build();
923
924 let dir = RamDirectory::new();
925 let config = IndexConfig {
926 max_docs_per_segment: 3,
927 ..Default::default()
928 };
929
930 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
931 .await
932 .unwrap();
933
934 for i in 0..9 {
936 let mut doc = Document::new();
937 doc.add_text(title, format!("Document {}", i));
938 writer.add_document(doc).await.unwrap();
939 }
940 writer.commit().await.unwrap();
941
942 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
944 assert_eq!(index.segment_readers().len(), 3);
945
946 let writer = IndexWriter::open(dir.clone(), config.clone())
948 .await
949 .unwrap();
950 writer.force_merge().await.unwrap();
951
952 let index = Index::open(dir, config).await.unwrap();
954 assert_eq!(index.segment_readers().len(), 1);
955 assert_eq!(index.num_docs(), 9);
956
957 for i in 0..9 {
959 let doc = index.doc(i).await.unwrap().unwrap();
960 assert_eq!(
961 doc.get_first(title).unwrap().as_text(),
962 Some(format!("Document {}", i).as_str())
963 );
964 }
965 }
966
967 #[tokio::test]
968 async fn test_match_query() {
969 let mut schema_builder = SchemaBuilder::default();
970 let title = schema_builder.add_text_field("title", true, true);
971 let body = schema_builder.add_text_field("body", true, true);
972 let schema = schema_builder.build();
973
974 let dir = RamDirectory::new();
975 let config = IndexConfig::default();
976
977 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
978 .await
979 .unwrap();
980
981 let mut doc1 = Document::new();
982 doc1.add_text(title, "rust programming");
983 doc1.add_text(body, "Learn rust language");
984 writer.add_document(doc1).await.unwrap();
985
986 let mut doc2 = Document::new();
987 doc2.add_text(title, "python programming");
988 doc2.add_text(body, "Learn python language");
989 writer.add_document(doc2).await.unwrap();
990
991 writer.commit().await.unwrap();
992
993 let index = Index::open(dir, config).await.unwrap();
994
995 let results = index.query("rust", 10).await.unwrap();
997 assert_eq!(results.hits.len(), 1);
998
999 let results = index.query("rust programming", 10).await.unwrap();
1001 assert!(!results.hits.is_empty());
1002
1003 let hit = &results.hits[0];
1005 assert!(!hit.address.segment_id.is_empty(), "Should have segment_id");
1006
1007 let doc = index.get_document(&hit.address).await.unwrap().unwrap();
1009 assert!(
1010 !doc.field_values().is_empty(),
1011 "Doc should have field values"
1012 );
1013
1014 let doc = index.doc(0).await.unwrap().unwrap();
1016 assert!(
1017 !doc.field_values().is_empty(),
1018 "Doc should have field values"
1019 );
1020 }
1021
1022 #[tokio::test]
1023 async fn test_slice_cache_warmup_and_load() {
1024 use crate::directories::SliceCachingDirectory;
1025
1026 let mut schema_builder = SchemaBuilder::default();
1027 let title = schema_builder.add_text_field("title", true, true);
1028 let body = schema_builder.add_text_field("body", true, true);
1029 let schema = schema_builder.build();
1030
1031 let dir = RamDirectory::new();
1032 let config = IndexConfig::default();
1033
1034 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1036 .await
1037 .unwrap();
1038
1039 for i in 0..10 {
1040 let mut doc = Document::new();
1041 doc.add_text(title, format!("Document {} about rust", i));
1042 doc.add_text(body, format!("This is body text number {}", i));
1043 writer.add_document(doc).await.unwrap();
1044 }
1045 writer.commit().await.unwrap();
1046
1047 let caching_dir = SliceCachingDirectory::new(dir.clone(), 1024 * 1024);
1049 let index = Index::open(caching_dir, config.clone()).await.unwrap();
1050
1051 let results = index.query("rust", 10).await.unwrap();
1053 assert!(!results.hits.is_empty());
1054
1055 let stats = index.slice_cache_stats();
1057 assert!(stats.total_bytes > 0, "Cache should have data after search");
1058
1059 index.save_slice_cache().await.unwrap();
1061
1062 assert!(
1064 dir.exists(Path::new(super::SLICE_CACHE_FILENAME))
1065 .await
1066 .unwrap()
1067 );
1068
1069 let index2 = Index::open_with_cache(dir.clone(), config.clone(), 1024 * 1024)
1071 .await
1072 .unwrap();
1073
1074 let stats2 = index2.slice_cache_stats();
1076 assert!(
1077 stats2.total_bytes > 0,
1078 "Cache should be prefilled from file"
1079 );
1080
1081 let results2 = index2.query("rust", 10).await.unwrap();
1083 assert_eq!(results.hits.len(), results2.hits.len());
1084 }
1085
1086 #[tokio::test]
1087 async fn test_multivalue_field_indexing_and_search() {
1088 let mut schema_builder = SchemaBuilder::default();
1089 let uris = schema_builder.add_text_field("uris", true, true);
1090 let title = schema_builder.add_text_field("title", true, true);
1091 let schema = schema_builder.build();
1092
1093 let dir = RamDirectory::new();
1094 let config = IndexConfig::default();
1095
1096 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
1098 .await
1099 .unwrap();
1100
1101 let mut doc = Document::new();
1102 doc.add_text(uris, "one");
1103 doc.add_text(uris, "two");
1104 doc.add_text(title, "Test Document");
1105 writer.add_document(doc).await.unwrap();
1106
1107 let mut doc2 = Document::new();
1109 doc2.add_text(uris, "three");
1110 doc2.add_text(title, "Another Document");
1111 writer.add_document(doc2).await.unwrap();
1112
1113 writer.commit().await.unwrap();
1114
1115 let index = Index::open(dir, config).await.unwrap();
1117 assert_eq!(index.num_docs(), 2);
1118
1119 let doc = index.doc(0).await.unwrap().unwrap();
1121 let all_uris: Vec<_> = doc.get_all(uris).collect();
1122 assert_eq!(all_uris.len(), 2, "Should have 2 uris values");
1123 assert_eq!(all_uris[0].as_text(), Some("one"));
1124 assert_eq!(all_uris[1].as_text(), Some("two"));
1125
1126 let json = doc.to_json(index.schema());
1128 let uris_json = json.get("uris").unwrap();
1129 assert!(uris_json.is_array(), "Multi-value field should be an array");
1130 let uris_arr = uris_json.as_array().unwrap();
1131 assert_eq!(uris_arr.len(), 2);
1132 assert_eq!(uris_arr[0].as_str(), Some("one"));
1133 assert_eq!(uris_arr[1].as_str(), Some("two"));
1134
1135 let results = index.query("uris:one", 10).await.unwrap();
1137 assert_eq!(results.hits.len(), 1, "Should find doc with 'one'");
1138 assert_eq!(results.hits[0].address.doc_id, 0);
1139
1140 let results = index.query("uris:two", 10).await.unwrap();
1141 assert_eq!(results.hits.len(), 1, "Should find doc with 'two'");
1142 assert_eq!(results.hits[0].address.doc_id, 0);
1143
1144 let results = index.query("uris:three", 10).await.unwrap();
1145 assert_eq!(results.hits.len(), 1, "Should find doc with 'three'");
1146 assert_eq!(results.hits[0].address.doc_id, 1);
1147
1148 let results = index.query("uris:nonexistent", 10).await.unwrap();
1150 assert_eq!(results.hits.len(), 0, "Should not find non-existent value");
1151 }
1152}