1#[cfg(feature = "native")]
12use std::fs::{File, OpenOptions};
13#[cfg(feature = "native")]
14use std::io::{BufWriter, Write};
15#[cfg(feature = "native")]
16use std::path::PathBuf;
17#[cfg(feature = "native")]
18use std::sync::Arc;
19
20#[cfg(feature = "native")]
21use hashbrown::HashMap;
22#[cfg(feature = "native")]
23use lasso::{Rodeo, Spur};
24#[cfg(feature = "native")]
25use rustc_hash::FxHashMap;
26
27#[cfg(feature = "native")]
28use super::store::StoreWriter;
29#[cfg(feature = "native")]
30use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
31#[cfg(feature = "native")]
32use crate::compression::CompressionLevel;
33#[cfg(feature = "native")]
34use crate::directories::{Directory, DirectoryWriter};
35#[cfg(feature = "native")]
36use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
37#[cfg(feature = "native")]
38use crate::structures::{PostingList, SSTableWriter, TermInfo};
39#[cfg(feature = "native")]
40use crate::tokenizer::BoxedTokenizer;
41#[cfg(feature = "native")]
42use crate::wand::WandData;
43#[cfg(feature = "native")]
44use crate::{DocId, Result};
45
46#[cfg(feature = "native")]
48const POSTING_FLUSH_THRESHOLD: usize = 10_000;
49
50#[cfg(feature = "native")]
52const SPILL_BUFFER_SIZE: usize = 16 * 1024 * 1024; #[cfg(feature = "native")]
57const NUM_INDEX_SHARDS: usize = 64;
58
59#[cfg(feature = "native")]
61#[derive(Clone, Copy, PartialEq, Eq, Hash)]
62struct TermKey {
63 field: u32,
64 term: Spur,
65}
66
67#[cfg(feature = "native")]
68impl TermKey {
69 #[inline]
71 fn shard(&self) -> usize {
72 (self.term.into_inner().get() as usize) & (NUM_INDEX_SHARDS - 1)
75 }
76}
77
78#[cfg(feature = "native")]
81struct ShardedInvertedIndex {
82 shards: Vec<HashMap<TermKey, PostingListMeta>>,
83}
84
85#[cfg(feature = "native")]
86impl ShardedInvertedIndex {
87 fn new(capacity_per_shard: usize) -> Self {
88 let mut shards = Vec::with_capacity(NUM_INDEX_SHARDS);
89 for _ in 0..NUM_INDEX_SHARDS {
90 shards.push(HashMap::with_capacity(capacity_per_shard));
91 }
92 Self { shards }
93 }
94
95 #[inline]
96 fn get_mut(&mut self, key: &TermKey) -> Option<&mut PostingListMeta> {
97 self.shards[key.shard()].get_mut(key)
98 }
99
100 #[inline]
103 fn get_or_insert(&mut self, key: TermKey, arena_len: u32) -> &mut PostingListMeta {
104 self.shards[key.shard()]
105 .entry(key)
106 .or_insert_with(|| PostingListMeta::new(arena_len))
107 }
108
109 fn len(&self) -> usize {
110 self.shards.iter().map(|s| s.len()).sum()
111 }
112
113 fn total_postings_in_memory(&self) -> usize {
115 self.shards
116 .iter()
117 .flat_map(|s| s.values())
118 .map(|p| p.len as usize)
119 .sum()
120 }
121
122 fn shard_stats(&self) -> (usize, usize, usize) {
124 let sizes: Vec<usize> = self.shards.iter().map(|s| s.len()).collect();
125 let min = *sizes.iter().min().unwrap_or(&0);
126 let max = *sizes.iter().max().unwrap_or(&0);
127 let avg = if sizes.is_empty() {
128 0
129 } else {
130 sizes.iter().sum::<usize>() / sizes.len()
131 };
132 (min, max, avg)
133 }
134}
135
136#[cfg(feature = "native")]
138struct ShardedIndexIter<'a> {
139 shards: std::slice::Iter<'a, HashMap<TermKey, PostingListMeta>>,
140 current: Option<hashbrown::hash_map::Iter<'a, TermKey, PostingListMeta>>,
141}
142
143#[cfg(feature = "native")]
144impl<'a> Iterator for ShardedIndexIter<'a> {
145 type Item = (&'a TermKey, &'a PostingListMeta);
146
147 fn next(&mut self) -> Option<Self::Item> {
148 loop {
149 if let Some(ref mut current) = self.current
150 && let Some(item) = current.next()
151 {
152 return Some(item);
153 }
154 match self.shards.next() {
156 Some(shard) => self.current = Some(shard.iter()),
157 None => return None,
158 }
159 }
160 }
161}
162
163#[cfg(feature = "native")]
164impl<'a> IntoIterator for &'a ShardedInvertedIndex {
165 type Item = (&'a TermKey, &'a PostingListMeta);
166 type IntoIter = ShardedIndexIter<'a>;
167
168 fn into_iter(self) -> Self::IntoIter {
169 ShardedIndexIter {
170 shards: self.shards.iter(),
171 current: None,
172 }
173 }
174}
175
176#[cfg(feature = "native")]
178#[derive(Clone, Copy)]
179#[repr(C, packed)]
180struct CompactPosting {
181 doc_id: DocId,
182 term_freq: u16,
183}
184
185#[cfg(feature = "native")]
187#[derive(Clone, Copy)]
188struct PostingListMeta {
189 start: u32,
191 len: u32,
193 last_doc_id: DocId,
195 spill_offset: i64,
197 spill_count: u32,
199}
200
201#[cfg(feature = "native")]
202impl PostingListMeta {
203 fn new(start: u32) -> Self {
204 Self {
205 start,
206 len: 0,
207 last_doc_id: u32::MAX,
208 spill_offset: -1,
209 spill_count: 0,
210 }
211 }
212
213 fn total_count(&self) -> usize {
214 self.len as usize + self.spill_count as usize
215 }
216
217 fn needs_spill(&self) -> bool {
218 self.len as usize >= POSTING_FLUSH_THRESHOLD
219 }
220}
221
222#[cfg(feature = "native")]
225struct PostingArena {
226 data: Vec<CompactPosting>,
228}
229
230#[cfg(feature = "native")]
231impl PostingArena {
232 fn with_capacity(capacity: usize) -> Self {
233 Self {
234 data: Vec::with_capacity(capacity),
235 }
236 }
237
238 #[inline]
240 fn add(&mut self, meta: &mut PostingListMeta, doc_id: DocId, term_freq: u32) {
241 if meta.len > 0 && meta.last_doc_id == doc_id {
243 let idx = meta.start as usize + meta.len as usize - 1;
245 let posting = &mut self.data[idx];
246 posting.term_freq = posting.term_freq.saturating_add(term_freq as u16);
247 return;
248 }
249
250 self.data.push(CompactPosting {
252 doc_id,
253 term_freq: term_freq.min(u16::MAX as u32) as u16,
254 });
255 meta.len += 1;
256 meta.last_doc_id = doc_id;
257 }
258
259 fn get_postings(&self, meta: &PostingListMeta) -> &[CompactPosting] {
261 let start = meta.start as usize;
262 let end = start + meta.len as usize;
263 &self.data[start..end]
264 }
265
266 fn clear_postings(&mut self, meta: &mut PostingListMeta) {
268 meta.len = 0;
271 meta.start = self.data.len() as u32; }
273
274 fn len(&self) -> usize {
275 self.data.len()
276 }
277}
278
279#[cfg(feature = "native")]
281#[derive(Debug, Clone)]
282pub struct SegmentBuilderStats {
283 pub num_docs: u32,
285 pub unique_terms: usize,
287 pub postings_in_memory: usize,
289 pub interned_strings: usize,
291 pub doc_field_lengths_size: usize,
293 pub shard_min: usize,
295 pub shard_max: usize,
296 pub shard_avg: usize,
297 pub spill_bytes: u64,
299}
300
301#[cfg(feature = "native")]
303#[derive(Clone)]
304pub struct SegmentBuilderConfig {
305 pub temp_dir: PathBuf,
307 pub compression_level: CompressionLevel,
309 pub num_compression_threads: usize,
311 pub interner_capacity: usize,
313 pub posting_map_capacity: usize,
315}
316
317#[cfg(feature = "native")]
318impl Default for SegmentBuilderConfig {
319 fn default() -> Self {
320 Self {
321 temp_dir: std::env::temp_dir(),
322 compression_level: CompressionLevel(7),
323 num_compression_threads: num_cpus::get(),
324 interner_capacity: 1_000_000,
325 posting_map_capacity: 500_000,
326 }
327 }
328}
329
330#[cfg(feature = "native")]
338pub struct SegmentBuilder {
339 schema: Schema,
340 config: SegmentBuilderConfig,
341 tokenizers: FxHashMap<Field, BoxedTokenizer>,
342
343 term_interner: Rodeo,
345
346 inverted_index: ShardedInvertedIndex,
349
350 posting_arena: PostingArena,
352
353 store_file: BufWriter<File>,
355 store_path: PathBuf,
356
357 spill_file: Option<BufWriter<File>>,
359 spill_path: PathBuf,
360 spill_offset: u64,
361
362 next_doc_id: DocId,
364
365 field_stats: FxHashMap<u32, FieldStats>,
367
368 doc_field_lengths: Vec<u32>,
372 num_indexed_fields: usize,
373 field_to_slot: FxHashMap<u32, usize>,
374
375 wand_data: Option<Arc<WandData>>,
377
378 local_tf_buffer: FxHashMap<Spur, u32>,
381
382 terms_to_spill_buffer: Vec<TermKey>,
384
385 token_buffer: String,
387}
388
389#[cfg(feature = "native")]
390impl SegmentBuilder {
391 pub fn new(schema: Schema, config: SegmentBuilderConfig) -> Result<Self> {
393 let segment_id = uuid::Uuid::new_v4();
394 let store_path = config
395 .temp_dir
396 .join(format!("hermes_store_{}.tmp", segment_id));
397 let spill_path = config
398 .temp_dir
399 .join(format!("hermes_spill_{}.tmp", segment_id));
400
401 let store_file = BufWriter::with_capacity(
402 SPILL_BUFFER_SIZE,
403 OpenOptions::new()
404 .create(true)
405 .write(true)
406 .truncate(true)
407 .open(&store_path)?,
408 );
409
410 let mut num_indexed_fields = 0;
412 let mut field_to_slot = FxHashMap::default();
413 for (field, entry) in schema.fields() {
414 if entry.indexed && matches!(entry.field_type, FieldType::Text) {
415 field_to_slot.insert(field.0, num_indexed_fields);
416 num_indexed_fields += 1;
417 }
418 }
419
420 Ok(Self {
421 schema,
422 tokenizers: FxHashMap::default(),
423 term_interner: Rodeo::new(),
424 inverted_index: ShardedInvertedIndex::new(
425 config.posting_map_capacity / NUM_INDEX_SHARDS,
426 ),
427 posting_arena: PostingArena::with_capacity(config.posting_map_capacity * 4),
428 store_file,
429 store_path,
430 spill_file: None,
431 spill_path,
432 spill_offset: 0,
433 next_doc_id: 0,
434 field_stats: FxHashMap::default(),
435 doc_field_lengths: Vec::new(),
436 num_indexed_fields,
437 field_to_slot,
438 wand_data: None,
439 local_tf_buffer: FxHashMap::default(),
440 terms_to_spill_buffer: Vec::new(),
441 token_buffer: String::with_capacity(64),
442 config,
443 })
444 }
445
446 pub fn with_wand_data(
448 schema: Schema,
449 config: SegmentBuilderConfig,
450 wand_data: Arc<WandData>,
451 ) -> Result<Self> {
452 let mut builder = Self::new(schema, config)?;
453 builder.wand_data = Some(wand_data);
454 Ok(builder)
455 }
456
457 pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
458 self.tokenizers.insert(field, tokenizer);
459 }
460
461 pub fn num_docs(&self) -> u32 {
462 self.next_doc_id
463 }
464
465 pub fn stats(&self) -> SegmentBuilderStats {
467 let (shard_min, shard_max, shard_avg) = self.inverted_index.shard_stats();
468 SegmentBuilderStats {
469 num_docs: self.next_doc_id,
470 unique_terms: self.inverted_index.len(),
471 postings_in_memory: self.inverted_index.total_postings_in_memory(),
472 interned_strings: self.term_interner.len(),
473 doc_field_lengths_size: self.doc_field_lengths.len(),
474 shard_min,
475 shard_max,
476 shard_avg,
477 spill_bytes: self.spill_offset,
478 }
479 }
480
481 pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
483 let doc_id = self.next_doc_id;
484 self.next_doc_id += 1;
485
486 let base_idx = self.doc_field_lengths.len();
488 self.doc_field_lengths
489 .resize(base_idx + self.num_indexed_fields, 0);
490
491 for (field, value) in doc.field_values() {
492 let entry = self.schema.get_field_entry(*field);
493 if entry.is_none() || !entry.unwrap().indexed {
494 continue;
495 }
496
497 let entry = entry.unwrap();
498 match (&entry.field_type, value) {
499 (FieldType::Text, FieldValue::Text(text)) => {
500 let token_count = self.index_text_field(*field, doc_id, text)?;
501
502 let stats = self.field_stats.entry(field.0).or_default();
504 stats.total_tokens += token_count as u64;
505 stats.doc_count += 1;
506
507 if let Some(&slot) = self.field_to_slot.get(&field.0) {
509 self.doc_field_lengths[base_idx + slot] = token_count;
510 }
511 }
512 (FieldType::U64, FieldValue::U64(v)) => {
513 self.index_numeric_field(*field, doc_id, *v)?;
514 }
515 (FieldType::I64, FieldValue::I64(v)) => {
516 self.index_numeric_field(*field, doc_id, *v as u64)?;
517 }
518 (FieldType::F64, FieldValue::F64(v)) => {
519 self.index_numeric_field(*field, doc_id, v.to_bits())?;
520 }
521 _ => {}
522 }
523 }
524
525 self.write_document_to_store(&doc)?;
527
528 Ok(doc_id)
529 }
530
531 fn index_text_field(&mut self, field: Field, doc_id: DocId, text: &str) -> Result<u32> {
539 self.local_tf_buffer.clear();
542
543 let mut token_count = 0u32;
544
545 for word in text.split_whitespace() {
547 self.token_buffer.clear();
549 for c in word.chars() {
550 if c.is_alphanumeric() {
551 for lc in c.to_lowercase() {
552 self.token_buffer.push(lc);
553 }
554 }
555 }
556
557 if self.token_buffer.is_empty() {
558 continue;
559 }
560
561 token_count += 1;
562
563 let term_spur = self.term_interner.get_or_intern(&self.token_buffer);
565 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
566 }
567
568 let field_id = field.0;
572 self.terms_to_spill_buffer.clear();
573
574 for (&term_spur, &tf) in &self.local_tf_buffer {
575 let term_key = TermKey {
576 field: field_id,
577 term: term_spur,
578 };
579
580 let arena_len = self.posting_arena.len() as u32;
581 let meta = self.inverted_index.get_or_insert(term_key, arena_len);
582 self.posting_arena.add(meta, doc_id, tf);
583
584 if meta.needs_spill() {
586 self.terms_to_spill_buffer.push(term_key);
587 }
588 }
589
590 for i in 0..self.terms_to_spill_buffer.len() {
592 let term_key = self.terms_to_spill_buffer[i];
593 self.spill_posting_list(term_key)?;
594 }
595
596 Ok(token_count)
597 }
598
599 fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
600 let term_str = format!("__num_{}", value);
602 let term_spur = self.term_interner.get_or_intern(&term_str);
603
604 let term_key = TermKey {
605 field: field.0,
606 term: term_spur,
607 };
608
609 let arena_len = self.posting_arena.len() as u32;
610 let meta = self.inverted_index.get_or_insert(term_key, arena_len);
611 self.posting_arena.add(meta, doc_id, 1);
612
613 Ok(())
614 }
615
616 fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
618 use byteorder::{LittleEndian, WriteBytesExt};
619
620 let doc_bytes = super::store::serialize_document(doc, &self.schema)?;
621
622 self.store_file
623 .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
624 self.store_file.write_all(&doc_bytes)?;
625
626 Ok(())
627 }
628
629 fn spill_posting_list(&mut self, term_key: TermKey) -> Result<()> {
631 use byteorder::{LittleEndian, WriteBytesExt};
632
633 if self.spill_file.is_none() {
635 let file = OpenOptions::new()
636 .create(true)
637 .write(true)
638 .read(true)
639 .truncate(true)
640 .open(&self.spill_path)?;
641 self.spill_file = Some(BufWriter::with_capacity(SPILL_BUFFER_SIZE, file));
642 }
643
644 let meta = self.inverted_index.get_mut(&term_key).unwrap();
645 let postings = self.posting_arena.get_postings(meta);
646
647 if meta.spill_offset < 0 {
649 meta.spill_offset = self.spill_offset as i64;
650 }
651
652 let spill_file = self.spill_file.as_mut().unwrap();
653
654 for p in postings {
656 spill_file.write_u32::<LittleEndian>(p.doc_id)?;
657 spill_file.write_u16::<LittleEndian>(p.term_freq)?;
658 self.spill_offset += 6; }
660
661 meta.spill_count += meta.len;
662
663 self.posting_arena.clear_postings(meta);
665
666 Ok(())
667 }
668
669 pub async fn build<D: Directory + DirectoryWriter>(
671 mut self,
672 dir: &D,
673 segment_id: SegmentId,
674 ) -> Result<SegmentMeta> {
675 self.store_file.flush()?;
677 if let Some(ref mut spill) = self.spill_file {
678 spill.flush()?;
679 }
680
681 let files = SegmentFiles::new(segment_id.0);
682
683 let (term_dict_data, postings_data) = self.build_postings()?;
685
686 let store_data = self.build_store_from_stream()?;
688
689 dir.write(&files.term_dict, &term_dict_data).await?;
691 dir.write(&files.postings, &postings_data).await?;
692 dir.write(&files.store, &store_data).await?;
693
694 let meta = SegmentMeta {
695 id: segment_id.0,
696 num_docs: self.next_doc_id,
697 field_stats: self.field_stats.clone(),
698 };
699
700 dir.write(&files.meta, &meta.serialize()?).await?;
701
702 let _ = std::fs::remove_file(&self.store_path);
704 let _ = std::fs::remove_file(&self.spill_path);
705
706 Ok(meta)
707 }
708
709 fn build_postings(&mut self) -> Result<(Vec<u8>, Vec<u8>)> {
711 use std::collections::BTreeMap;
712
713 let mut sorted_terms: BTreeMap<Vec<u8>, &PostingListMeta> = BTreeMap::new();
716
717 for (term_key, meta) in &self.inverted_index {
718 let term_str = self.term_interner.resolve(&term_key.term);
719 let mut key = Vec::with_capacity(4 + term_str.len());
720 key.extend_from_slice(&term_key.field.to_le_bytes());
721 key.extend_from_slice(term_str.as_bytes());
722 sorted_terms.insert(key, meta);
723 }
724
725 let mut term_dict = Vec::new();
726 let mut postings = Vec::new();
727 let mut writer = SSTableWriter::<TermInfo>::new(&mut term_dict);
728
729 let spill_mmap = if self.spill_file.is_some() {
731 drop(self.spill_file.take()); let file = File::open(&self.spill_path)?;
733 Some(unsafe { memmap2::Mmap::map(&file)? })
734 } else {
735 None
736 };
737
738 for (key, meta) in sorted_terms {
739 let mut full_postings = PostingList::with_capacity(meta.total_count());
741
742 if meta.spill_offset >= 0
744 && let Some(ref mmap) = spill_mmap
745 {
746 let mut offset = meta.spill_offset as usize;
747 for _ in 0..meta.spill_count {
748 let doc_id = u32::from_le_bytes([
749 mmap[offset],
750 mmap[offset + 1],
751 mmap[offset + 2],
752 mmap[offset + 3],
753 ]);
754 let term_freq = u16::from_le_bytes([mmap[offset + 4], mmap[offset + 5]]);
755 full_postings.push(doc_id, term_freq as u32);
756 offset += 6;
757 }
758 }
759
760 let arena_postings = self.posting_arena.get_postings(meta);
762 for p in arena_postings {
763 full_postings.push(p.doc_id, p.term_freq as u32);
764 }
765
766 let doc_ids: Vec<u32> = full_postings.iter().map(|p| p.doc_id).collect();
768 let term_freqs: Vec<u32> = full_postings.iter().map(|p| p.term_freq).collect();
769
770 let term_info = if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
771 inline
772 } else {
773 let posting_offset = postings.len() as u64;
774 let block_list =
775 crate::structures::BlockPostingList::from_posting_list(&full_postings)?;
776 block_list.serialize(&mut postings)?;
777 TermInfo::external(
778 posting_offset,
779 (postings.len() as u64 - posting_offset) as u32,
780 full_postings.doc_count(),
781 )
782 };
783
784 writer.insert(&key, &term_info)?;
785 }
786
787 writer.finish()?;
788 Ok((term_dict, postings))
789 }
790
791 fn build_store_from_stream(&mut self) -> Result<Vec<u8>> {
793 drop(std::mem::replace(
795 &mut self.store_file,
796 BufWriter::new(File::create("/dev/null")?),
797 ));
798
799 let file = File::open(&self.store_path)?;
800 let mmap = unsafe { memmap2::Mmap::map(&file)? };
801
802 let mut store_data = Vec::new();
804 let mut store_writer =
805 StoreWriter::with_compression_level(&mut store_data, self.config.compression_level);
806
807 let mut offset = 0usize;
808 while offset < mmap.len() {
809 if offset + 4 > mmap.len() {
810 break;
811 }
812
813 let doc_len = u32::from_le_bytes([
814 mmap[offset],
815 mmap[offset + 1],
816 mmap[offset + 2],
817 mmap[offset + 3],
818 ]) as usize;
819 offset += 4;
820
821 if offset + doc_len > mmap.len() {
822 break;
823 }
824
825 let doc_bytes = &mmap[offset..offset + doc_len];
826 offset += doc_len;
827
828 if let Ok(doc) = super::store::deserialize_document(doc_bytes, &self.schema) {
830 store_writer.store(&doc, &self.schema)?;
831 }
832 }
833
834 store_writer.finish()?;
835 Ok(store_data)
836 }
837}
838
839#[cfg(feature = "native")]
840impl Drop for SegmentBuilder {
841 fn drop(&mut self) {
842 let _ = std::fs::remove_file(&self.store_path);
844 let _ = std::fs::remove_file(&self.spill_path);
845 }
846}