1#[cfg(feature = "native")]
12use std::fs::{File, OpenOptions};
13#[cfg(feature = "native")]
14use std::io::{BufWriter, Write};
15#[cfg(feature = "native")]
16use std::path::PathBuf;
17#[cfg(feature = "native")]
18use std::sync::Arc;
19
20#[cfg(feature = "native")]
21use hashbrown::HashMap;
22#[cfg(feature = "native")]
23use lasso::{Key, Rodeo, Spur};
24#[cfg(feature = "native")]
25use rustc_hash::FxHashMap;
26
27#[cfg(feature = "native")]
28use super::store::StoreWriter;
29#[cfg(feature = "native")]
30use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
31#[cfg(feature = "native")]
32use crate::compression::CompressionLevel;
33#[cfg(feature = "native")]
34use crate::directories::{Directory, DirectoryWriter};
35#[cfg(feature = "native")]
36use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
37#[cfg(feature = "native")]
38use crate::structures::{PostingList, SSTableWriter, TermInfo};
39#[cfg(feature = "native")]
40use crate::tokenizer::{BoxedTokenizer, LowercaseTokenizer};
41#[cfg(feature = "native")]
42use crate::wand::WandData;
43#[cfg(feature = "native")]
44use crate::{DocId, Result};
45
46#[cfg(feature = "native")]
48const POSTING_FLUSH_THRESHOLD: usize = 100_000;
49
50#[cfg(feature = "native")]
52const SPILL_BUFFER_SIZE: usize = 16 * 1024 * 1024; #[cfg(feature = "native")]
57const NUM_INDEX_SHARDS: usize = 64;
58
59#[cfg(feature = "native")]
61#[derive(Clone, Copy, PartialEq, Eq, Hash)]
62struct TermKey {
63 field: u32,
64 term: ShardedSpur,
65}
66
67#[cfg(feature = "native")]
68impl TermKey {
69 #[inline]
71 fn shard(&self) -> usize {
72 (self.term.packed as usize) & (NUM_INDEX_SHARDS - 1)
75 }
76}
77
78#[cfg(feature = "native")]
81struct ShardedInvertedIndex {
82 shards: Vec<HashMap<TermKey, SpillablePostingList>>,
83}
84
85#[cfg(feature = "native")]
86impl ShardedInvertedIndex {
87 fn new(capacity_per_shard: usize) -> Self {
88 let mut shards = Vec::with_capacity(NUM_INDEX_SHARDS);
89 for _ in 0..NUM_INDEX_SHARDS {
90 shards.push(HashMap::with_capacity(capacity_per_shard));
91 }
92 Self { shards }
93 }
94
95 #[inline]
96 fn get_mut(&mut self, key: &TermKey) -> Option<&mut SpillablePostingList> {
97 self.shards[key.shard()].get_mut(key)
98 }
99
100 #[inline]
102 fn get_or_insert(&mut self, key: TermKey) -> &mut SpillablePostingList {
103 self.shards[key.shard()]
104 .entry(key)
105 .or_insert_with(SpillablePostingList::new)
106 }
107
108 fn len(&self) -> usize {
109 self.shards.iter().map(|s| s.len()).sum()
110 }
111
112 fn total_postings_in_memory(&self) -> usize {
114 self.shards
115 .iter()
116 .flat_map(|s| s.values())
117 .map(|p| p.memory.len())
118 .sum()
119 }
120
121 fn shard_stats(&self) -> (usize, usize, usize) {
123 let sizes: Vec<usize> = self.shards.iter().map(|s| s.len()).collect();
124 let min = *sizes.iter().min().unwrap_or(&0);
125 let max = *sizes.iter().max().unwrap_or(&0);
126 let avg = if sizes.is_empty() {
127 0
128 } else {
129 sizes.iter().sum::<usize>() / sizes.len()
130 };
131 (min, max, avg)
132 }
133}
134
135#[cfg(feature = "native")]
137struct ShardedIndexIter<'a> {
138 shards: std::slice::Iter<'a, HashMap<TermKey, SpillablePostingList>>,
139 current: Option<hashbrown::hash_map::Iter<'a, TermKey, SpillablePostingList>>,
140}
141
142#[cfg(feature = "native")]
143impl<'a> Iterator for ShardedIndexIter<'a> {
144 type Item = (&'a TermKey, &'a SpillablePostingList);
145
146 fn next(&mut self) -> Option<Self::Item> {
147 loop {
148 if let Some(ref mut current) = self.current
149 && let Some(item) = current.next()
150 {
151 return Some(item);
152 }
153 match self.shards.next() {
155 Some(shard) => self.current = Some(shard.iter()),
156 None => return None,
157 }
158 }
159 }
160}
161
162#[cfg(feature = "native")]
163impl<'a> IntoIterator for &'a ShardedInvertedIndex {
164 type Item = (&'a TermKey, &'a SpillablePostingList);
165 type IntoIter = ShardedIndexIter<'a>;
166
167 fn into_iter(self) -> Self::IntoIter {
168 ShardedIndexIter {
169 shards: self.shards.iter(),
170 current: None,
171 }
172 }
173}
174
175#[cfg(feature = "native")]
178const NUM_INTERNER_SHARDS: usize = 64;
179
180#[cfg(feature = "native")]
183#[derive(Clone, Copy, PartialEq, Eq, Hash)]
184struct ShardedSpur {
185 packed: u32,
187}
188
189#[cfg(feature = "native")]
190impl ShardedSpur {
191 #[inline]
192 fn new(shard: u8, local_spur: Spur) -> Self {
193 let local_id = local_spur.into_inner().get();
194 debug_assert!(local_id < (1 << 24), "Local spur ID overflow");
195 Self {
196 packed: ((shard as u32) << 24) | (local_id & 0x00FF_FFFF),
197 }
198 }
199
200 #[inline]
201 fn shard(self) -> usize {
202 (self.packed >> 24) as usize
203 }
204
205 #[inline]
206 fn local_id(self) -> u32 {
207 self.packed & 0x00FF_FFFF
208 }
209}
210
211#[cfg(feature = "native")]
214struct ShardedTermInterner {
215 shards: Vec<Rodeo>,
216}
217
218#[cfg(feature = "native")]
219impl ShardedTermInterner {
220 fn new() -> Self {
221 let mut shards = Vec::with_capacity(NUM_INTERNER_SHARDS);
222 for _ in 0..NUM_INTERNER_SHARDS {
223 shards.push(Rodeo::new());
224 }
225 Self { shards }
226 }
227
228 #[inline]
230 fn shard_for_str(s: &str) -> usize {
231 use std::hash::{Hash, Hasher};
233 let mut hasher = rustc_hash::FxHasher::default();
234 s.hash(&mut hasher);
235 (hasher.finish() as usize) & (NUM_INTERNER_SHARDS - 1)
236 }
237
238 #[inline]
240 fn get_or_intern(&mut self, s: &str) -> ShardedSpur {
241 let shard_idx = Self::shard_for_str(s);
242 let local_spur = self.shards[shard_idx].get_or_intern(s);
243 ShardedSpur::new(shard_idx as u8, local_spur)
244 }
245
246 #[inline]
248 fn resolve(&self, spur: ShardedSpur) -> &str {
249 self.shards[spur.shard()].resolve(&Spur::try_from_usize(spur.local_id() as usize).unwrap())
250 }
251
252 fn len(&self) -> usize {
254 self.shards.iter().map(|s| s.len()).sum()
255 }
256}
257
258#[cfg(feature = "native")]
260#[derive(Clone, Copy)]
261struct CompactPosting {
262 doc_id: DocId,
263 term_freq: u16, }
265
266#[cfg(feature = "native")]
268struct SpillablePostingList {
269 memory: Vec<CompactPosting>,
271 spill_offset: i64,
273 spill_count: u32,
275}
276
277#[cfg(feature = "native")]
278impl SpillablePostingList {
279 fn new() -> Self {
280 Self {
281 memory: Vec::new(),
282 spill_offset: -1,
283 spill_count: 0,
284 }
285 }
286
287 #[allow(dead_code)]
288 fn with_capacity(capacity: usize) -> Self {
289 Self {
290 memory: Vec::with_capacity(capacity),
291 spill_offset: -1,
292 spill_count: 0,
293 }
294 }
295
296 #[inline]
297 fn add(&mut self, doc_id: DocId, term_freq: u32) {
298 if let Some(last) = self.memory.last_mut()
300 && last.doc_id == doc_id
301 {
302 last.term_freq = last.term_freq.saturating_add(term_freq as u16);
303 return;
304 }
305 self.memory.push(CompactPosting {
306 doc_id,
307 term_freq: term_freq.min(u16::MAX as u32) as u16,
308 });
309 }
310
311 fn total_count(&self) -> usize {
312 self.memory.len() + self.spill_count as usize
313 }
314
315 fn needs_spill(&self) -> bool {
316 self.memory.len() >= POSTING_FLUSH_THRESHOLD
317 }
318}
319
320#[cfg(feature = "native")]
322#[derive(Debug, Clone)]
323pub struct SegmentBuilderStats {
324 pub num_docs: u32,
326 pub unique_terms: usize,
328 pub postings_in_memory: usize,
330 pub interned_strings: usize,
332 pub doc_field_lengths_size: usize,
334 pub shard_min: usize,
336 pub shard_max: usize,
337 pub shard_avg: usize,
338 pub spill_bytes: u64,
340}
341
342#[cfg(feature = "native")]
344#[derive(Clone)]
345pub struct SegmentBuilderConfig {
346 pub temp_dir: PathBuf,
348 pub compression_level: CompressionLevel,
350 pub num_compression_threads: usize,
352 pub interner_capacity: usize,
354 pub posting_map_capacity: usize,
356}
357
358#[cfg(feature = "native")]
359impl Default for SegmentBuilderConfig {
360 fn default() -> Self {
361 Self {
362 temp_dir: std::env::temp_dir(),
363 compression_level: CompressionLevel(7),
364 num_compression_threads: num_cpus::get(),
365 interner_capacity: 1_000_000,
366 posting_map_capacity: 500_000,
367 }
368 }
369}
370
371#[cfg(feature = "native")]
379pub struct SegmentBuilder {
380 schema: Schema,
381 config: SegmentBuilderConfig,
382 tokenizers: FxHashMap<Field, BoxedTokenizer>,
383
384 term_interner: ShardedTermInterner,
387
388 inverted_index: ShardedInvertedIndex,
391
392 store_file: BufWriter<File>,
394 store_path: PathBuf,
395
396 spill_file: Option<BufWriter<File>>,
398 spill_path: PathBuf,
399 spill_offset: u64,
400
401 next_doc_id: DocId,
403
404 field_stats: FxHashMap<u32, FieldStats>,
406
407 doc_field_lengths: Vec<u32>,
411 num_indexed_fields: usize,
412 field_to_slot: FxHashMap<u32, usize>,
413
414 wand_data: Option<Arc<WandData>>,
416
417 local_tf_buffer: FxHashMap<ShardedSpur, u32>,
420
421 terms_to_spill_buffer: Vec<TermKey>,
423}
424
425#[cfg(feature = "native")]
426impl SegmentBuilder {
427 pub fn new(schema: Schema, config: SegmentBuilderConfig) -> Result<Self> {
429 let segment_id = uuid::Uuid::new_v4();
430 let store_path = config
431 .temp_dir
432 .join(format!("hermes_store_{}.tmp", segment_id));
433 let spill_path = config
434 .temp_dir
435 .join(format!("hermes_spill_{}.tmp", segment_id));
436
437 let store_file = BufWriter::with_capacity(
438 SPILL_BUFFER_SIZE,
439 OpenOptions::new()
440 .create(true)
441 .write(true)
442 .truncate(true)
443 .open(&store_path)?,
444 );
445
446 let mut num_indexed_fields = 0;
448 let mut field_to_slot = FxHashMap::default();
449 for (field, entry) in schema.fields() {
450 if entry.indexed && matches!(entry.field_type, FieldType::Text) {
451 field_to_slot.insert(field.0, num_indexed_fields);
452 num_indexed_fields += 1;
453 }
454 }
455
456 Ok(Self {
457 schema,
458 tokenizers: FxHashMap::default(),
459 term_interner: ShardedTermInterner::new(),
460 inverted_index: ShardedInvertedIndex::new(
461 config.posting_map_capacity / NUM_INDEX_SHARDS,
462 ),
463 store_file,
464 store_path,
465 spill_file: None,
466 spill_path,
467 spill_offset: 0,
468 next_doc_id: 0,
469 field_stats: FxHashMap::default(),
470 doc_field_lengths: Vec::new(),
471 num_indexed_fields,
472 field_to_slot,
473 wand_data: None,
474 local_tf_buffer: FxHashMap::default(),
475 terms_to_spill_buffer: Vec::new(),
476 config,
477 })
478 }
479
480 pub fn with_wand_data(
482 schema: Schema,
483 config: SegmentBuilderConfig,
484 wand_data: Arc<WandData>,
485 ) -> Result<Self> {
486 let mut builder = Self::new(schema, config)?;
487 builder.wand_data = Some(wand_data);
488 Ok(builder)
489 }
490
491 pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
492 self.tokenizers.insert(field, tokenizer);
493 }
494
495 pub fn num_docs(&self) -> u32 {
496 self.next_doc_id
497 }
498
499 pub fn stats(&self) -> SegmentBuilderStats {
501 let (shard_min, shard_max, shard_avg) = self.inverted_index.shard_stats();
502 SegmentBuilderStats {
503 num_docs: self.next_doc_id,
504 unique_terms: self.inverted_index.len(),
505 postings_in_memory: self.inverted_index.total_postings_in_memory(),
506 interned_strings: self.term_interner.len(),
507 doc_field_lengths_size: self.doc_field_lengths.len(),
508 shard_min,
509 shard_max,
510 shard_avg,
511 spill_bytes: self.spill_offset,
512 }
513 }
514
515 pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
517 let doc_id = self.next_doc_id;
518 self.next_doc_id += 1;
519
520 let base_idx = self.doc_field_lengths.len();
522 self.doc_field_lengths
523 .resize(base_idx + self.num_indexed_fields, 0);
524
525 for (field, value) in doc.field_values() {
526 let entry = self.schema.get_field_entry(*field);
527 if entry.is_none() || !entry.unwrap().indexed {
528 continue;
529 }
530
531 let entry = entry.unwrap();
532 match (&entry.field_type, value) {
533 (FieldType::Text, FieldValue::Text(text)) => {
534 let token_count = self.index_text_field(*field, doc_id, text)?;
535
536 let stats = self.field_stats.entry(field.0).or_default();
538 stats.total_tokens += token_count as u64;
539 stats.doc_count += 1;
540
541 if let Some(&slot) = self.field_to_slot.get(&field.0) {
543 self.doc_field_lengths[base_idx + slot] = token_count;
544 }
545 }
546 (FieldType::U64, FieldValue::U64(v)) => {
547 self.index_numeric_field(*field, doc_id, *v)?;
548 }
549 (FieldType::I64, FieldValue::I64(v)) => {
550 self.index_numeric_field(*field, doc_id, *v as u64)?;
551 }
552 (FieldType::F64, FieldValue::F64(v)) => {
553 self.index_numeric_field(*field, doc_id, v.to_bits())?;
554 }
555 _ => {}
556 }
557 }
558
559 self.write_document_to_store(&doc)?;
561
562 Ok(doc_id)
563 }
564
565 fn index_text_field(&mut self, field: Field, doc_id: DocId, text: &str) -> Result<u32> {
571 let default_tokenizer = LowercaseTokenizer;
572 let tokenizer: &dyn crate::tokenizer::TokenizerClone = self
573 .tokenizers
574 .get(&field)
575 .map(|t| t.as_ref())
576 .unwrap_or(&default_tokenizer);
577
578 let tokens = tokenizer.tokenize(text);
579 let token_count = tokens.len() as u32;
580
581 self.local_tf_buffer.clear();
584
585 for token in tokens {
586 let term_spur = self.term_interner.get_or_intern(&token.text);
588 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
589 }
590
591 let field_id = field.0;
595 self.terms_to_spill_buffer.clear();
596
597 for (&term_spur, &tf) in &self.local_tf_buffer {
598 let term_key = TermKey {
599 field: field_id,
600 term: term_spur,
601 };
602
603 let posting = self.inverted_index.get_or_insert(term_key);
604 posting.add(doc_id, tf);
605
606 if posting.needs_spill() {
608 self.terms_to_spill_buffer.push(term_key);
609 }
610 }
611
612 for i in 0..self.terms_to_spill_buffer.len() {
614 let term_key = self.terms_to_spill_buffer[i];
615 self.spill_posting_list(term_key)?;
616 }
617
618 Ok(token_count)
619 }
620
621 fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
622 let term_str = format!("__num_{}", value);
624 let term_spur = self.term_interner.get_or_intern(&term_str);
625
626 let term_key = TermKey {
627 field: field.0,
628 term: term_spur,
629 };
630
631 let posting = self.inverted_index.get_or_insert(term_key);
632 posting.add(doc_id, 1);
633
634 Ok(())
635 }
636
637 fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
639 use byteorder::{LittleEndian, WriteBytesExt};
640
641 let doc_bytes = super::store::serialize_document(doc, &self.schema)?;
642
643 self.store_file
644 .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
645 self.store_file.write_all(&doc_bytes)?;
646
647 Ok(())
648 }
649
650 fn spill_posting_list(&mut self, term_key: TermKey) -> Result<()> {
652 use byteorder::{LittleEndian, WriteBytesExt};
653
654 let posting = self.inverted_index.get_mut(&term_key).unwrap();
655
656 if self.spill_file.is_none() {
658 let file = OpenOptions::new()
659 .create(true)
660 .write(true)
661 .read(true)
662 .truncate(true)
663 .open(&self.spill_path)?;
664 self.spill_file = Some(BufWriter::with_capacity(SPILL_BUFFER_SIZE, file));
665 }
666
667 let spill_file = self.spill_file.as_mut().unwrap();
668
669 if posting.spill_offset < 0 {
671 posting.spill_offset = self.spill_offset as i64;
672 }
673
674 for p in &posting.memory {
676 spill_file.write_u32::<LittleEndian>(p.doc_id)?;
677 spill_file.write_u16::<LittleEndian>(p.term_freq)?;
678 self.spill_offset += 6; }
680
681 posting.spill_count += posting.memory.len() as u32;
682 posting.memory.clear();
683 posting.memory.shrink_to(POSTING_FLUSH_THRESHOLD / 4); Ok(())
686 }
687
688 pub async fn build<D: Directory + DirectoryWriter>(
690 mut self,
691 dir: &D,
692 segment_id: SegmentId,
693 ) -> Result<SegmentMeta> {
694 self.store_file.flush()?;
696 if let Some(ref mut spill) = self.spill_file {
697 spill.flush()?;
698 }
699
700 let files = SegmentFiles::new(segment_id.0);
701
702 let (term_dict_data, postings_data) = self.build_postings()?;
704
705 let store_data = self.build_store_from_stream()?;
707
708 dir.write(&files.term_dict, &term_dict_data).await?;
710 dir.write(&files.postings, &postings_data).await?;
711 dir.write(&files.store, &store_data).await?;
712
713 let meta = SegmentMeta {
714 id: segment_id.0,
715 num_docs: self.next_doc_id,
716 field_stats: self.field_stats.clone(),
717 };
718
719 dir.write(&files.meta, &meta.serialize()?).await?;
720
721 let _ = std::fs::remove_file(&self.store_path);
723 let _ = std::fs::remove_file(&self.spill_path);
724
725 Ok(meta)
726 }
727
728 fn build_postings(&mut self) -> Result<(Vec<u8>, Vec<u8>)> {
730 use std::collections::BTreeMap;
731
732 let mut sorted_terms: BTreeMap<Vec<u8>, &SpillablePostingList> = BTreeMap::new();
735
736 for (term_key, posting_list) in &self.inverted_index {
737 let term_str = self.term_interner.resolve(term_key.term);
738 let mut key = Vec::with_capacity(4 + term_str.len());
739 key.extend_from_slice(&term_key.field.to_le_bytes());
740 key.extend_from_slice(term_str.as_bytes());
741 sorted_terms.insert(key, posting_list);
742 }
743
744 let mut term_dict = Vec::new();
745 let mut postings = Vec::new();
746 let mut writer = SSTableWriter::<TermInfo>::new(&mut term_dict);
747
748 let spill_mmap = if self.spill_file.is_some() {
750 drop(self.spill_file.take()); let file = File::open(&self.spill_path)?;
752 Some(unsafe { memmap2::Mmap::map(&file)? })
753 } else {
754 None
755 };
756
757 for (key, spill_posting) in sorted_terms {
758 let mut full_postings = PostingList::with_capacity(spill_posting.total_count());
760
761 if spill_posting.spill_offset >= 0
763 && let Some(ref mmap) = spill_mmap
764 {
765 let mut offset = spill_posting.spill_offset as usize;
766 for _ in 0..spill_posting.spill_count {
767 let doc_id = u32::from_le_bytes([
768 mmap[offset],
769 mmap[offset + 1],
770 mmap[offset + 2],
771 mmap[offset + 3],
772 ]);
773 let term_freq = u16::from_le_bytes([mmap[offset + 4], mmap[offset + 5]]);
774 full_postings.push(doc_id, term_freq as u32);
775 offset += 6;
776 }
777 }
778
779 for p in &spill_posting.memory {
781 full_postings.push(p.doc_id, p.term_freq as u32);
782 }
783
784 let doc_ids: Vec<u32> = full_postings.iter().map(|p| p.doc_id).collect();
786 let term_freqs: Vec<u32> = full_postings.iter().map(|p| p.term_freq).collect();
787
788 let term_info = if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
789 inline
790 } else {
791 let posting_offset = postings.len() as u64;
792 let block_list =
793 crate::structures::BlockPostingList::from_posting_list(&full_postings)?;
794 block_list.serialize(&mut postings)?;
795 TermInfo::external(
796 posting_offset,
797 (postings.len() as u64 - posting_offset) as u32,
798 full_postings.doc_count(),
799 )
800 };
801
802 writer.insert(&key, &term_info)?;
803 }
804
805 writer.finish()?;
806 Ok((term_dict, postings))
807 }
808
809 fn build_store_from_stream(&mut self) -> Result<Vec<u8>> {
811 drop(std::mem::replace(
813 &mut self.store_file,
814 BufWriter::new(File::create("/dev/null")?),
815 ));
816
817 let file = File::open(&self.store_path)?;
818 let mmap = unsafe { memmap2::Mmap::map(&file)? };
819
820 let mut store_data = Vec::new();
822 let mut store_writer =
823 StoreWriter::with_compression_level(&mut store_data, self.config.compression_level);
824
825 let mut offset = 0usize;
826 while offset < mmap.len() {
827 if offset + 4 > mmap.len() {
828 break;
829 }
830
831 let doc_len = u32::from_le_bytes([
832 mmap[offset],
833 mmap[offset + 1],
834 mmap[offset + 2],
835 mmap[offset + 3],
836 ]) as usize;
837 offset += 4;
838
839 if offset + doc_len > mmap.len() {
840 break;
841 }
842
843 let doc_bytes = &mmap[offset..offset + doc_len];
844 offset += doc_len;
845
846 if let Ok(doc) = super::store::deserialize_document(doc_bytes, &self.schema) {
848 store_writer.store(&doc, &self.schema)?;
849 }
850 }
851
852 store_writer.finish()?;
853 Ok(store_data)
854 }
855}
856
857#[cfg(feature = "native")]
858impl Drop for SegmentBuilder {
859 fn drop(&mut self) {
860 let _ = std::fs::remove_file(&self.store_path);
862 let _ = std::fs::remove_file(&self.spill_path);
863 }
864}