1use std::path::Path;
9use std::sync::Arc;
10
11use parking_lot::RwLock;
12
13use crate::DocId;
14use crate::directories::{Directory, SliceCachingDirectory};
15use crate::dsl::{Document, Field, Schema};
16use crate::error::{Error, Result};
17use crate::segment::{SegmentId, SegmentReader};
18use crate::structures::BlockPostingList;
19
20#[cfg(feature = "native")]
21mod writer;
22#[cfg(feature = "native")]
23pub use writer::IndexWriter;
24
25pub const SLICE_CACHE_FILENAME: &str = "index.slicecache";
27
28#[derive(Debug, Clone)]
30pub struct IndexConfig {
31 pub num_threads: usize,
33 pub num_indexing_threads: usize,
35 pub num_compression_threads: usize,
37 pub term_cache_blocks: usize,
39 pub store_cache_blocks: usize,
41 pub max_docs_per_segment: u32,
43 pub merge_policy: Box<dyn crate::merge::MergePolicy>,
45 pub optimization: crate::structures::IndexOptimization,
47}
48
49impl Default for IndexConfig {
50 fn default() -> Self {
51 #[cfg(feature = "native")]
52 let cpus = num_cpus::get().max(1);
53 #[cfg(not(feature = "native"))]
54 let cpus = 1;
55
56 Self {
57 num_threads: cpus,
58 num_indexing_threads: 1,
59 num_compression_threads: cpus,
60 term_cache_blocks: 256,
61 store_cache_blocks: 32,
62 max_docs_per_segment: 100_000,
63 merge_policy: Box::new(crate::merge::TieredMergePolicy::default()),
64 optimization: crate::structures::IndexOptimization::default(),
65 }
66 }
67}
68
69pub struct Index<D: Directory> {
74 directory: Arc<D>,
75 schema: Arc<Schema>,
76 config: IndexConfig,
77 segments: RwLock<Vec<Arc<SegmentReader>>>,
78 default_fields: Vec<crate::Field>,
79 tokenizers: Arc<crate::tokenizer::TokenizerRegistry>,
80 #[cfg(feature = "native")]
81 thread_pool: Arc<rayon::ThreadPool>,
82}
83
84impl<D: Directory> Index<D> {
85 pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
87 let directory = Arc::new(directory);
88
89 let schema_slice = directory.open_read(Path::new("schema.json")).await?;
91 let schema_bytes = schema_slice.read_bytes().await?;
92 let schema: Schema = serde_json::from_slice(schema_bytes.as_slice())
93 .map_err(|e| Error::Serialization(e.to_string()))?;
94 let schema = Arc::new(schema);
95
96 let segments = Self::load_segments(&directory, &schema, &config).await?;
98
99 #[cfg(feature = "native")]
100 let thread_pool = {
101 let pool = rayon::ThreadPoolBuilder::new()
102 .num_threads(config.num_threads)
103 .build()
104 .map_err(|e| Error::Io(std::io::Error::other(e)))?;
105 Arc::new(pool)
106 };
107
108 let default_fields: Vec<crate::Field> = if !schema.default_fields().is_empty() {
110 schema.default_fields().to_vec()
111 } else {
112 schema
113 .fields()
114 .filter(|(_, entry)| {
115 entry.indexed && entry.field_type == crate::dsl::FieldType::Text
116 })
117 .map(|(field, _)| field)
118 .collect()
119 };
120
121 Ok(Self {
122 directory,
123 schema,
124 config,
125 segments: RwLock::new(segments),
126 default_fields,
127 tokenizers: Arc::new(crate::tokenizer::TokenizerRegistry::default()),
128 #[cfg(feature = "native")]
129 thread_pool,
130 })
131 }
132
133 async fn load_segments(
134 directory: &Arc<D>,
135 schema: &Arc<Schema>,
136 config: &IndexConfig,
137 ) -> Result<Vec<Arc<SegmentReader>>> {
138 let segments_path = Path::new("segments.json");
140 if !directory.exists(segments_path).await? {
141 return Ok(Vec::new());
142 }
143
144 let segments_slice = directory.open_read(segments_path).await?;
145 let segments_bytes = segments_slice.read_bytes().await?;
146 let segment_ids: Vec<String> = serde_json::from_slice(segments_bytes.as_slice())
147 .map_err(|e| Error::Serialization(e.to_string()))?;
148
149 let mut segments = Vec::new();
150 let mut doc_id_offset = 0u32;
151
152 for id_str in segment_ids {
153 let segment_id = SegmentId::from_hex(&id_str)
154 .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
155 let reader = SegmentReader::open(
156 directory.as_ref(),
157 segment_id,
158 Arc::clone(schema),
159 doc_id_offset,
160 config.term_cache_blocks,
161 )
162 .await?;
163
164 doc_id_offset += reader.meta().num_docs;
165 segments.push(Arc::new(reader));
166 }
167
168 Ok(segments)
169 }
170
171 pub fn schema(&self) -> &Schema {
173 &self.schema
174 }
175
176 pub fn directory(&self) -> &D {
178 &self.directory
179 }
180
181 pub fn num_docs(&self) -> u32 {
183 self.segments.read().iter().map(|s| s.num_docs()).sum()
184 }
185
186 pub async fn doc(&self, doc_id: DocId) -> Result<Option<Document>> {
188 let segments = self.segments.read().clone();
189
190 let mut offset = 0u32;
191 for segment in segments.iter() {
192 let segment_docs = segment.meta().num_docs;
193 if doc_id < offset + segment_docs {
194 let local_doc_id = doc_id - offset;
195 return segment.doc(local_doc_id).await;
196 }
197 offset += segment_docs;
198 }
199
200 Ok(None)
201 }
202
203 pub async fn get_postings(
205 &self,
206 field: Field,
207 term: &[u8],
208 ) -> Result<Vec<(Arc<SegmentReader>, BlockPostingList)>> {
209 let segments = self.segments.read().clone();
210 let mut results = Vec::new();
211
212 for segment in segments.iter() {
213 if let Some(postings) = segment.get_postings(field, term).await? {
214 results.push((Arc::clone(segment), postings));
215 }
216 }
217
218 Ok(results)
219 }
220
221 #[cfg(feature = "native")]
223 pub async fn spawn_blocking<F, R>(&self, f: F) -> R
224 where
225 F: FnOnce() -> R + Send + 'static,
226 R: Send + 'static,
227 {
228 let (tx, rx) = tokio::sync::oneshot::channel();
229 self.thread_pool.spawn(move || {
230 let result = f();
231 let _ = tx.send(result);
232 });
233 rx.await.expect("Thread pool task panicked")
234 }
235
236 pub fn segment_readers(&self) -> Vec<Arc<SegmentReader>> {
238 self.segments.read().clone()
239 }
240
241 pub async fn reload(&self) -> Result<()> {
243 let new_segments = Self::load_segments(&self.directory, &self.schema, &self.config).await?;
244 *self.segments.write() = new_segments;
245 Ok(())
246 }
247
248 pub async fn search(
250 &self,
251 query: &dyn crate::query::Query,
252 limit: usize,
253 ) -> Result<Vec<crate::query::SearchResult>> {
254 let segments = self.segments.read().clone();
255 let mut all_results = Vec::new();
256
257 for segment in &segments {
258 let results = crate::query::search_segment(segment.as_ref(), query, limit).await?;
259 all_results.extend(results);
260 }
261
262 all_results.sort_by(|a, b| {
264 b.score
265 .partial_cmp(&a.score)
266 .unwrap_or(std::cmp::Ordering::Equal)
267 });
268 all_results.truncate(limit);
269
270 Ok(all_results)
271 }
272
273 pub async fn search_with_addresses(
275 &self,
276 query: &dyn crate::query::Query,
277 limit: usize,
278 ) -> Result<crate::query::SearchResponse> {
279 let segments = self.segments.read().clone();
280 let mut all_results: Vec<(u128, crate::query::SearchResult)> = Vec::new();
281
282 for segment in &segments {
283 let segment_id = segment.meta().id;
284 let results = crate::query::search_segment(segment.as_ref(), query, limit).await?;
285 for result in results {
286 all_results.push((segment_id, result));
287 }
288 }
289
290 all_results.sort_by(|a, b| {
292 b.1.score
293 .partial_cmp(&a.1.score)
294 .unwrap_or(std::cmp::Ordering::Equal)
295 });
296 all_results.truncate(limit);
297
298 let total_hits = all_results.len() as u32;
299 let hits: Vec<crate::query::SearchHit> = all_results
300 .into_iter()
301 .map(|(segment_id, result)| crate::query::SearchHit {
302 address: crate::query::DocAddress::new(segment_id, result.doc_id),
303 score: result.score,
304 })
305 .collect();
306
307 Ok(crate::query::SearchResponse { hits, total_hits })
308 }
309
310 pub async fn get_document(
312 &self,
313 address: &crate::query::DocAddress,
314 ) -> Result<Option<Document>> {
315 let segment_id = address
316 .segment_id_u128()
317 .ok_or_else(|| Error::Query(format!("Invalid segment ID: {}", address.segment_id)))?;
318
319 let segments = self.segments.read().clone();
320 for segment in &segments {
321 if segment.meta().id == segment_id {
322 return segment.doc(address.doc_id).await;
323 }
324 }
325
326 Ok(None)
327 }
328
329 pub fn default_fields(&self) -> &[crate::Field] {
331 &self.default_fields
332 }
333
334 pub fn set_default_fields(&mut self, fields: Vec<crate::Field>) {
336 self.default_fields = fields;
337 }
338
339 pub fn tokenizers(&self) -> &Arc<crate::tokenizer::TokenizerRegistry> {
341 &self.tokenizers
342 }
343
344 pub fn query_parser(&self) -> crate::dsl::QueryLanguageParser {
349 let query_routers = self.schema.query_routers();
351 if !query_routers.is_empty() {
352 if let Ok(router) = crate::dsl::QueryFieldRouter::from_rules(query_routers) {
354 return crate::dsl::QueryLanguageParser::with_router(
355 Arc::clone(&self.schema),
356 self.default_fields.clone(),
357 Arc::clone(&self.tokenizers),
358 router,
359 );
360 }
361 }
362
363 crate::dsl::QueryLanguageParser::new(
365 Arc::clone(&self.schema),
366 self.default_fields.clone(),
367 Arc::clone(&self.tokenizers),
368 )
369 }
370
371 pub async fn query(
377 &self,
378 query_str: &str,
379 limit: usize,
380 ) -> Result<crate::query::SearchResponse> {
381 let parser = self.query_parser();
382 let query = parser.parse(query_str).map_err(Error::Query)?;
383 self.search_with_addresses(query.as_ref(), limit).await
384 }
385}
386
387impl<D: Directory> Index<SliceCachingDirectory<D>> {
389 pub async fn open_with_cache(
394 directory: D,
395 config: IndexConfig,
396 cache_max_bytes: usize,
397 ) -> Result<Self> {
398 let caching_dir = SliceCachingDirectory::new(directory, cache_max_bytes);
399
400 let cache_path = Path::new(SLICE_CACHE_FILENAME);
402 if let Ok(true) = caching_dir.inner().exists(cache_path).await
403 && let Ok(slice) = caching_dir.inner().open_read(cache_path).await
404 && let Ok(bytes) = slice.read_bytes().await
405 {
406 let _ = caching_dir.deserialize(bytes.as_slice());
407 }
408
409 Self::open(caching_dir, config).await
410 }
411
412 #[cfg(feature = "native")]
417 pub async fn save_slice_cache(&self) -> Result<()>
418 where
419 D: crate::directories::DirectoryWriter,
420 {
421 let cache_data = self.directory.serialize();
422 let cache_path = Path::new(SLICE_CACHE_FILENAME);
423 self.directory
424 .inner()
425 .write(cache_path, &cache_data)
426 .await?;
427 Ok(())
428 }
429
430 pub fn slice_cache_stats(&self) -> crate::directories::SliceCacheStats {
432 self.directory.stats()
433 }
434}
435
436#[cfg(feature = "native")]
445pub async fn warmup_and_save_slice_cache<D: crate::directories::DirectoryWriter>(
446 directory: D,
447 config: IndexConfig,
448 cache_max_bytes: usize,
449) -> Result<()> {
450 let caching_dir = SliceCachingDirectory::new(directory, cache_max_bytes);
451 let index = Index::open(caching_dir, config).await?;
452
453 index.save_slice_cache().await?;
459
460 Ok(())
461}
462
463#[cfg(feature = "native")]
464impl<D: Directory> Clone for Index<D> {
465 fn clone(&self) -> Self {
466 Self {
467 directory: Arc::clone(&self.directory),
468 schema: Arc::clone(&self.schema),
469 config: self.config.clone(),
470 segments: RwLock::new(self.segments.read().clone()),
471 default_fields: self.default_fields.clone(),
472 tokenizers: Arc::clone(&self.tokenizers),
473 thread_pool: Arc::clone(&self.thread_pool),
474 }
475 }
476}
477
478#[cfg(test)]
479mod tests {
480 use super::*;
481 use crate::directories::RamDirectory;
482 use crate::dsl::SchemaBuilder;
483
484 #[tokio::test]
485 async fn test_index_create_and_search() {
486 let mut schema_builder = SchemaBuilder::default();
487 let title = schema_builder.add_text_field("title", true, true);
488 let body = schema_builder.add_text_field("body", true, true);
489 let schema = schema_builder.build();
490
491 let dir = RamDirectory::new();
492 let config = IndexConfig::default();
493
494 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
496 .await
497 .unwrap();
498
499 let mut doc1 = Document::new();
500 doc1.add_text(title, "Hello World");
501 doc1.add_text(body, "This is the first document");
502 writer.add_document(doc1).await.unwrap();
503
504 let mut doc2 = Document::new();
505 doc2.add_text(title, "Goodbye World");
506 doc2.add_text(body, "This is the second document");
507 writer.add_document(doc2).await.unwrap();
508
509 writer.commit().await.unwrap();
510
511 let index = Index::open(dir, config).await.unwrap();
513 assert_eq!(index.num_docs(), 2);
514
515 let postings = index.get_postings(title, b"world").await.unwrap();
517 assert_eq!(postings.len(), 1); assert_eq!(postings[0].1.doc_count(), 2); let doc = index.doc(0).await.unwrap().unwrap();
522 assert_eq!(doc.get_first(title).unwrap().as_text(), Some("Hello World"));
523 }
524
525 #[tokio::test]
526 async fn test_multiple_segments() {
527 let mut schema_builder = SchemaBuilder::default();
528 let title = schema_builder.add_text_field("title", true, true);
529 let schema = schema_builder.build();
530
531 let dir = RamDirectory::new();
532 let config = IndexConfig {
533 max_docs_per_segment: 5, ..Default::default()
535 };
536
537 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
538 .await
539 .unwrap();
540
541 for batch in 0..3 {
543 for i in 0..5 {
544 let mut doc = Document::new();
545 doc.add_text(title, format!("Document {} batch {}", i, batch));
546 writer.add_document(doc).await.unwrap();
547 }
548 writer.commit().await.unwrap();
549 }
550
551 let index = Index::open(dir, config).await.unwrap();
553 assert_eq!(index.num_docs(), 15);
554 assert_eq!(index.segment_readers().len(), 3);
555 }
556
557 #[tokio::test]
558 async fn test_segment_merge() {
559 let mut schema_builder = SchemaBuilder::default();
560 let title = schema_builder.add_text_field("title", true, true);
561 let schema = schema_builder.build();
562
563 let dir = RamDirectory::new();
564 let config = IndexConfig {
565 max_docs_per_segment: 3,
566 ..Default::default()
567 };
568
569 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
570 .await
571 .unwrap();
572
573 for i in 0..9 {
575 let mut doc = Document::new();
576 doc.add_text(title, format!("Document {}", i));
577 writer.add_document(doc).await.unwrap();
578 }
579 writer.commit().await.unwrap();
580
581 let index = Index::open(dir.clone(), config.clone()).await.unwrap();
583 assert_eq!(index.segment_readers().len(), 3);
584
585 let writer = IndexWriter::open(dir.clone(), config.clone())
587 .await
588 .unwrap();
589 writer.force_merge().await.unwrap();
590
591 let index = Index::open(dir, config).await.unwrap();
593 assert_eq!(index.segment_readers().len(), 1);
594 assert_eq!(index.num_docs(), 9);
595
596 for i in 0..9 {
598 let doc = index.doc(i).await.unwrap().unwrap();
599 assert_eq!(
600 doc.get_first(title).unwrap().as_text(),
601 Some(format!("Document {}", i).as_str())
602 );
603 }
604 }
605
606 #[tokio::test]
607 async fn test_match_query() {
608 let mut schema_builder = SchemaBuilder::default();
609 let title = schema_builder.add_text_field("title", true, true);
610 let body = schema_builder.add_text_field("body", true, true);
611 let schema = schema_builder.build();
612
613 let dir = RamDirectory::new();
614 let config = IndexConfig::default();
615
616 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
617 .await
618 .unwrap();
619
620 let mut doc1 = Document::new();
621 doc1.add_text(title, "rust programming");
622 doc1.add_text(body, "Learn rust language");
623 writer.add_document(doc1).await.unwrap();
624
625 let mut doc2 = Document::new();
626 doc2.add_text(title, "python programming");
627 doc2.add_text(body, "Learn python language");
628 writer.add_document(doc2).await.unwrap();
629
630 writer.commit().await.unwrap();
631
632 let index = Index::open(dir, config).await.unwrap();
633
634 let results = index.query("rust", 10).await.unwrap();
636 assert_eq!(results.hits.len(), 1);
637
638 let results = index.query("rust programming", 10).await.unwrap();
640 assert!(!results.hits.is_empty());
641
642 let hit = &results.hits[0];
644 assert!(!hit.address.segment_id.is_empty(), "Should have segment_id");
645
646 let doc = index.get_document(&hit.address).await.unwrap().unwrap();
648 assert!(
649 !doc.field_values().is_empty(),
650 "Doc should have field values"
651 );
652
653 let doc = index.doc(0).await.unwrap().unwrap();
655 assert!(
656 !doc.field_values().is_empty(),
657 "Doc should have field values"
658 );
659 }
660
661 #[tokio::test]
662 async fn test_slice_cache_warmup_and_load() {
663 use crate::directories::SliceCachingDirectory;
664
665 let mut schema_builder = SchemaBuilder::default();
666 let title = schema_builder.add_text_field("title", true, true);
667 let body = schema_builder.add_text_field("body", true, true);
668 let schema = schema_builder.build();
669
670 let dir = RamDirectory::new();
671 let config = IndexConfig::default();
672
673 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
675 .await
676 .unwrap();
677
678 for i in 0..10 {
679 let mut doc = Document::new();
680 doc.add_text(title, format!("Document {} about rust", i));
681 doc.add_text(body, format!("This is body text number {}", i));
682 writer.add_document(doc).await.unwrap();
683 }
684 writer.commit().await.unwrap();
685
686 let caching_dir = SliceCachingDirectory::new(dir.clone(), 1024 * 1024);
688 let index = Index::open(caching_dir, config.clone()).await.unwrap();
689
690 let results = index.query("rust", 10).await.unwrap();
692 assert!(!results.hits.is_empty());
693
694 let stats = index.slice_cache_stats();
696 assert!(stats.total_bytes > 0, "Cache should have data after search");
697
698 index.save_slice_cache().await.unwrap();
700
701 assert!(dir.exists(Path::new(SLICE_CACHE_FILENAME)).await.unwrap());
703
704 let index2 = Index::open_with_cache(dir.clone(), config.clone(), 1024 * 1024)
706 .await
707 .unwrap();
708
709 let stats2 = index2.slice_cache_stats();
711 assert!(
712 stats2.total_bytes > 0,
713 "Cache should be prefilled from file"
714 );
715
716 let results2 = index2.query("rust", 10).await.unwrap();
718 assert_eq!(results.hits.len(), results2.hits.len());
719 }
720
721 #[tokio::test]
722 async fn test_multivalue_field_indexing_and_search() {
723 let mut schema_builder = SchemaBuilder::default();
724 let uris = schema_builder.add_text_field("uris", true, true);
725 let title = schema_builder.add_text_field("title", true, true);
726 let schema = schema_builder.build();
727
728 let dir = RamDirectory::new();
729 let config = IndexConfig::default();
730
731 let writer = IndexWriter::create(dir.clone(), schema.clone(), config.clone())
733 .await
734 .unwrap();
735
736 let mut doc = Document::new();
737 doc.add_text(uris, "one");
738 doc.add_text(uris, "two");
739 doc.add_text(title, "Test Document");
740 writer.add_document(doc).await.unwrap();
741
742 let mut doc2 = Document::new();
744 doc2.add_text(uris, "three");
745 doc2.add_text(title, "Another Document");
746 writer.add_document(doc2).await.unwrap();
747
748 writer.commit().await.unwrap();
749
750 let index = Index::open(dir, config).await.unwrap();
752 assert_eq!(index.num_docs(), 2);
753
754 let doc = index.doc(0).await.unwrap().unwrap();
756 let all_uris: Vec<_> = doc.get_all(uris).collect();
757 assert_eq!(all_uris.len(), 2, "Should have 2 uris values");
758 assert_eq!(all_uris[0].as_text(), Some("one"));
759 assert_eq!(all_uris[1].as_text(), Some("two"));
760
761 let json = doc.to_json(index.schema());
763 let uris_json = json.get("uris").unwrap();
764 assert!(uris_json.is_array(), "Multi-value field should be an array");
765 let uris_arr = uris_json.as_array().unwrap();
766 assert_eq!(uris_arr.len(), 2);
767 assert_eq!(uris_arr[0].as_str(), Some("one"));
768 assert_eq!(uris_arr[1].as_str(), Some("two"));
769
770 let results = index.query("uris:one", 10).await.unwrap();
772 assert_eq!(results.hits.len(), 1, "Should find doc with 'one'");
773 assert_eq!(results.hits[0].address.doc_id, 0);
774
775 let results = index.query("uris:two", 10).await.unwrap();
776 assert_eq!(results.hits.len(), 1, "Should find doc with 'two'");
777 assert_eq!(results.hits[0].address.doc_id, 0);
778
779 let results = index.query("uris:three", 10).await.unwrap();
780 assert_eq!(results.hits.len(), 1, "Should find doc with 'three'");
781 assert_eq!(results.hits[0].address.doc_id, 1);
782
783 let results = index.query("uris:nonexistent", 10).await.unwrap();
785 assert_eq!(results.hits.len(), 0, "Should not find non-existent value");
786 }
787}