1#[cfg(feature = "native")]
12use std::fs::{File, OpenOptions};
13#[cfg(feature = "native")]
14use std::io::{BufWriter, Write};
15#[cfg(feature = "native")]
16use std::path::PathBuf;
17#[cfg(feature = "native")]
18use std::sync::Arc;
19
20#[cfg(feature = "native")]
21use hashbrown::HashMap;
22#[cfg(feature = "native")]
23use lasso::{Rodeo, Spur};
24#[cfg(feature = "native")]
25use rustc_hash::FxHashMap;
26
27#[cfg(feature = "native")]
28use super::store::StoreWriter;
29#[cfg(feature = "native")]
30use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
31#[cfg(feature = "native")]
32use crate::compression::CompressionLevel;
33#[cfg(feature = "native")]
34use crate::directories::{Directory, DirectoryWriter};
35#[cfg(feature = "native")]
36use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
37#[cfg(feature = "native")]
38use crate::structures::{PostingList, SSTableWriter, TermInfo};
39#[cfg(feature = "native")]
40use crate::tokenizer::BoxedTokenizer;
41#[cfg(feature = "native")]
42use crate::wand::WandData;
43#[cfg(feature = "native")]
44use crate::{DocId, Result};
45
46#[cfg(feature = "native")]
48const POSTING_FLUSH_THRESHOLD: usize = 10_000;
49
50#[cfg(feature = "native")]
52const SPILL_BUFFER_SIZE: usize = 16 * 1024 * 1024; #[cfg(feature = "native")]
57const NUM_INDEX_SHARDS: usize = 64;
58
59#[cfg(feature = "native")]
61#[derive(Clone, Copy, PartialEq, Eq, Hash)]
62struct TermKey {
63 field: u32,
64 term: Spur,
65}
66
67#[cfg(feature = "native")]
68impl TermKey {
69 #[inline]
71 fn shard(&self) -> usize {
72 (self.term.into_inner().get() as usize) & (NUM_INDEX_SHARDS - 1)
75 }
76}
77
78#[cfg(feature = "native")]
81struct ShardedInvertedIndex {
82 shards: Vec<HashMap<TermKey, SpillablePostingList>>,
83}
84
85#[cfg(feature = "native")]
86impl ShardedInvertedIndex {
87 fn new(capacity_per_shard: usize) -> Self {
88 let mut shards = Vec::with_capacity(NUM_INDEX_SHARDS);
89 for _ in 0..NUM_INDEX_SHARDS {
90 shards.push(HashMap::with_capacity(capacity_per_shard));
91 }
92 Self { shards }
93 }
94
95 #[inline]
96 fn get_mut(&mut self, key: &TermKey) -> Option<&mut SpillablePostingList> {
97 self.shards[key.shard()].get_mut(key)
98 }
99
100 #[inline]
102 fn get_or_insert(&mut self, key: TermKey) -> &mut SpillablePostingList {
103 self.shards[key.shard()]
104 .entry(key)
105 .or_insert_with(SpillablePostingList::new)
106 }
107
108 fn len(&self) -> usize {
109 self.shards.iter().map(|s| s.len()).sum()
110 }
111
112 fn total_postings_in_memory(&self) -> usize {
114 self.shards
115 .iter()
116 .flat_map(|s| s.values())
117 .map(|p| p.memory.len())
118 .sum()
119 }
120
121 fn shard_stats(&self) -> (usize, usize, usize) {
123 let sizes: Vec<usize> = self.shards.iter().map(|s| s.len()).collect();
124 let min = *sizes.iter().min().unwrap_or(&0);
125 let max = *sizes.iter().max().unwrap_or(&0);
126 let avg = if sizes.is_empty() {
127 0
128 } else {
129 sizes.iter().sum::<usize>() / sizes.len()
130 };
131 (min, max, avg)
132 }
133}
134
135#[cfg(feature = "native")]
137struct ShardedIndexIter<'a> {
138 shards: std::slice::Iter<'a, HashMap<TermKey, SpillablePostingList>>,
139 current: Option<hashbrown::hash_map::Iter<'a, TermKey, SpillablePostingList>>,
140}
141
142#[cfg(feature = "native")]
143impl<'a> Iterator for ShardedIndexIter<'a> {
144 type Item = (&'a TermKey, &'a SpillablePostingList);
145
146 fn next(&mut self) -> Option<Self::Item> {
147 loop {
148 if let Some(ref mut current) = self.current
149 && let Some(item) = current.next()
150 {
151 return Some(item);
152 }
153 match self.shards.next() {
155 Some(shard) => self.current = Some(shard.iter()),
156 None => return None,
157 }
158 }
159 }
160}
161
162#[cfg(feature = "native")]
163impl<'a> IntoIterator for &'a ShardedInvertedIndex {
164 type Item = (&'a TermKey, &'a SpillablePostingList);
165 type IntoIter = ShardedIndexIter<'a>;
166
167 fn into_iter(self) -> Self::IntoIter {
168 ShardedIndexIter {
169 shards: self.shards.iter(),
170 current: None,
171 }
172 }
173}
174
175#[cfg(feature = "native")]
177#[derive(Clone, Copy)]
178struct CompactPosting {
179 doc_id: DocId,
180 term_freq: u16, }
182
183#[cfg(feature = "native")]
185struct SpillablePostingList {
186 memory: Vec<CompactPosting>,
188 spill_offset: i64,
190 spill_count: u32,
192}
193
194#[cfg(feature = "native")]
195impl SpillablePostingList {
196 fn new() -> Self {
197 Self {
198 memory: Vec::new(),
199 spill_offset: -1,
200 spill_count: 0,
201 }
202 }
203
204 #[allow(dead_code)]
205 fn with_capacity(capacity: usize) -> Self {
206 Self {
207 memory: Vec::with_capacity(capacity),
208 spill_offset: -1,
209 spill_count: 0,
210 }
211 }
212
213 #[inline]
214 fn add(&mut self, doc_id: DocId, term_freq: u32) {
215 if let Some(last) = self.memory.last_mut()
217 && last.doc_id == doc_id
218 {
219 last.term_freq = last.term_freq.saturating_add(term_freq as u16);
220 return;
221 }
222 self.memory.push(CompactPosting {
223 doc_id,
224 term_freq: term_freq.min(u16::MAX as u32) as u16,
225 });
226 }
227
228 fn total_count(&self) -> usize {
229 self.memory.len() + self.spill_count as usize
230 }
231
232 fn needs_spill(&self) -> bool {
233 self.memory.len() >= POSTING_FLUSH_THRESHOLD
234 }
235}
236
237#[cfg(feature = "native")]
239#[derive(Debug, Clone)]
240pub struct SegmentBuilderStats {
241 pub num_docs: u32,
243 pub unique_terms: usize,
245 pub postings_in_memory: usize,
247 pub interned_strings: usize,
249 pub doc_field_lengths_size: usize,
251 pub shard_min: usize,
253 pub shard_max: usize,
254 pub shard_avg: usize,
255 pub spill_bytes: u64,
257}
258
259#[cfg(feature = "native")]
261#[derive(Clone)]
262pub struct SegmentBuilderConfig {
263 pub temp_dir: PathBuf,
265 pub compression_level: CompressionLevel,
267 pub num_compression_threads: usize,
269 pub interner_capacity: usize,
271 pub posting_map_capacity: usize,
273}
274
275#[cfg(feature = "native")]
276impl Default for SegmentBuilderConfig {
277 fn default() -> Self {
278 Self {
279 temp_dir: std::env::temp_dir(),
280 compression_level: CompressionLevel(7),
281 num_compression_threads: num_cpus::get(),
282 interner_capacity: 1_000_000,
283 posting_map_capacity: 500_000,
284 }
285 }
286}
287
288#[cfg(feature = "native")]
296pub struct SegmentBuilder {
297 schema: Schema,
298 config: SegmentBuilderConfig,
299 tokenizers: FxHashMap<Field, BoxedTokenizer>,
300
301 term_interner: Rodeo,
303
304 inverted_index: ShardedInvertedIndex,
307
308 store_file: BufWriter<File>,
310 store_path: PathBuf,
311
312 spill_file: Option<BufWriter<File>>,
314 spill_path: PathBuf,
315 spill_offset: u64,
316
317 next_doc_id: DocId,
319
320 field_stats: FxHashMap<u32, FieldStats>,
322
323 doc_field_lengths: Vec<u32>,
327 num_indexed_fields: usize,
328 field_to_slot: FxHashMap<u32, usize>,
329
330 wand_data: Option<Arc<WandData>>,
332
333 local_tf_buffer: FxHashMap<Spur, u32>,
336
337 terms_to_spill_buffer: Vec<TermKey>,
339
340 token_buffer: String,
342}
343
344#[cfg(feature = "native")]
345impl SegmentBuilder {
346 pub fn new(schema: Schema, config: SegmentBuilderConfig) -> Result<Self> {
348 let segment_id = uuid::Uuid::new_v4();
349 let store_path = config
350 .temp_dir
351 .join(format!("hermes_store_{}.tmp", segment_id));
352 let spill_path = config
353 .temp_dir
354 .join(format!("hermes_spill_{}.tmp", segment_id));
355
356 let store_file = BufWriter::with_capacity(
357 SPILL_BUFFER_SIZE,
358 OpenOptions::new()
359 .create(true)
360 .write(true)
361 .truncate(true)
362 .open(&store_path)?,
363 );
364
365 let mut num_indexed_fields = 0;
367 let mut field_to_slot = FxHashMap::default();
368 for (field, entry) in schema.fields() {
369 if entry.indexed && matches!(entry.field_type, FieldType::Text) {
370 field_to_slot.insert(field.0, num_indexed_fields);
371 num_indexed_fields += 1;
372 }
373 }
374
375 Ok(Self {
376 schema,
377 tokenizers: FxHashMap::default(),
378 term_interner: Rodeo::new(),
379 inverted_index: ShardedInvertedIndex::new(
380 config.posting_map_capacity / NUM_INDEX_SHARDS,
381 ),
382 store_file,
383 store_path,
384 spill_file: None,
385 spill_path,
386 spill_offset: 0,
387 next_doc_id: 0,
388 field_stats: FxHashMap::default(),
389 doc_field_lengths: Vec::new(),
390 num_indexed_fields,
391 field_to_slot,
392 wand_data: None,
393 local_tf_buffer: FxHashMap::default(),
394 terms_to_spill_buffer: Vec::new(),
395 token_buffer: String::with_capacity(64),
396 config,
397 })
398 }
399
400 pub fn with_wand_data(
402 schema: Schema,
403 config: SegmentBuilderConfig,
404 wand_data: Arc<WandData>,
405 ) -> Result<Self> {
406 let mut builder = Self::new(schema, config)?;
407 builder.wand_data = Some(wand_data);
408 Ok(builder)
409 }
410
411 pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
412 self.tokenizers.insert(field, tokenizer);
413 }
414
415 pub fn num_docs(&self) -> u32 {
416 self.next_doc_id
417 }
418
419 pub fn stats(&self) -> SegmentBuilderStats {
421 let (shard_min, shard_max, shard_avg) = self.inverted_index.shard_stats();
422 SegmentBuilderStats {
423 num_docs: self.next_doc_id,
424 unique_terms: self.inverted_index.len(),
425 postings_in_memory: self.inverted_index.total_postings_in_memory(),
426 interned_strings: self.term_interner.len(),
427 doc_field_lengths_size: self.doc_field_lengths.len(),
428 shard_min,
429 shard_max,
430 shard_avg,
431 spill_bytes: self.spill_offset,
432 }
433 }
434
435 pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
437 let doc_id = self.next_doc_id;
438 self.next_doc_id += 1;
439
440 let base_idx = self.doc_field_lengths.len();
442 self.doc_field_lengths
443 .resize(base_idx + self.num_indexed_fields, 0);
444
445 for (field, value) in doc.field_values() {
446 let entry = self.schema.get_field_entry(*field);
447 if entry.is_none() || !entry.unwrap().indexed {
448 continue;
449 }
450
451 let entry = entry.unwrap();
452 match (&entry.field_type, value) {
453 (FieldType::Text, FieldValue::Text(text)) => {
454 let token_count = self.index_text_field(*field, doc_id, text)?;
455
456 let stats = self.field_stats.entry(field.0).or_default();
458 stats.total_tokens += token_count as u64;
459 stats.doc_count += 1;
460
461 if let Some(&slot) = self.field_to_slot.get(&field.0) {
463 self.doc_field_lengths[base_idx + slot] = token_count;
464 }
465 }
466 (FieldType::U64, FieldValue::U64(v)) => {
467 self.index_numeric_field(*field, doc_id, *v)?;
468 }
469 (FieldType::I64, FieldValue::I64(v)) => {
470 self.index_numeric_field(*field, doc_id, *v as u64)?;
471 }
472 (FieldType::F64, FieldValue::F64(v)) => {
473 self.index_numeric_field(*field, doc_id, v.to_bits())?;
474 }
475 _ => {}
476 }
477 }
478
479 self.write_document_to_store(&doc)?;
481
482 Ok(doc_id)
483 }
484
485 fn index_text_field(&mut self, field: Field, doc_id: DocId, text: &str) -> Result<u32> {
493 self.local_tf_buffer.clear();
496
497 let mut token_count = 0u32;
498
499 for word in text.split_whitespace() {
501 self.token_buffer.clear();
503 for c in word.chars() {
504 if c.is_alphanumeric() {
505 for lc in c.to_lowercase() {
506 self.token_buffer.push(lc);
507 }
508 }
509 }
510
511 if self.token_buffer.is_empty() {
512 continue;
513 }
514
515 token_count += 1;
516
517 let term_spur = self.term_interner.get_or_intern(&self.token_buffer);
519 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
520 }
521
522 let field_id = field.0;
526 self.terms_to_spill_buffer.clear();
527
528 for (&term_spur, &tf) in &self.local_tf_buffer {
529 let term_key = TermKey {
530 field: field_id,
531 term: term_spur,
532 };
533
534 let posting = self.inverted_index.get_or_insert(term_key);
535 posting.add(doc_id, tf);
536
537 if posting.needs_spill() {
539 self.terms_to_spill_buffer.push(term_key);
540 }
541 }
542
543 for i in 0..self.terms_to_spill_buffer.len() {
545 let term_key = self.terms_to_spill_buffer[i];
546 self.spill_posting_list(term_key)?;
547 }
548
549 Ok(token_count)
550 }
551
552 fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
553 let term_str = format!("__num_{}", value);
555 let term_spur = self.term_interner.get_or_intern(&term_str);
556
557 let term_key = TermKey {
558 field: field.0,
559 term: term_spur,
560 };
561
562 let posting = self.inverted_index.get_or_insert(term_key);
563 posting.add(doc_id, 1);
564
565 Ok(())
566 }
567
568 fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
570 use byteorder::{LittleEndian, WriteBytesExt};
571
572 let doc_bytes = super::store::serialize_document(doc, &self.schema)?;
573
574 self.store_file
575 .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
576 self.store_file.write_all(&doc_bytes)?;
577
578 Ok(())
579 }
580
581 fn spill_posting_list(&mut self, term_key: TermKey) -> Result<()> {
583 use byteorder::{LittleEndian, WriteBytesExt};
584
585 let posting = self.inverted_index.get_mut(&term_key).unwrap();
586
587 if self.spill_file.is_none() {
589 let file = OpenOptions::new()
590 .create(true)
591 .write(true)
592 .read(true)
593 .truncate(true)
594 .open(&self.spill_path)?;
595 self.spill_file = Some(BufWriter::with_capacity(SPILL_BUFFER_SIZE, file));
596 }
597
598 let spill_file = self.spill_file.as_mut().unwrap();
599
600 if posting.spill_offset < 0 {
602 posting.spill_offset = self.spill_offset as i64;
603 }
604
605 for p in &posting.memory {
607 spill_file.write_u32::<LittleEndian>(p.doc_id)?;
608 spill_file.write_u16::<LittleEndian>(p.term_freq)?;
609 self.spill_offset += 6; }
611
612 posting.spill_count += posting.memory.len() as u32;
613 posting.memory.clear();
614 posting.memory.shrink_to(POSTING_FLUSH_THRESHOLD / 4); Ok(())
617 }
618
619 pub async fn build<D: Directory + DirectoryWriter>(
621 mut self,
622 dir: &D,
623 segment_id: SegmentId,
624 ) -> Result<SegmentMeta> {
625 self.store_file.flush()?;
627 if let Some(ref mut spill) = self.spill_file {
628 spill.flush()?;
629 }
630
631 let files = SegmentFiles::new(segment_id.0);
632
633 let (term_dict_data, postings_data) = self.build_postings()?;
635
636 let store_data = self.build_store_from_stream()?;
638
639 dir.write(&files.term_dict, &term_dict_data).await?;
641 dir.write(&files.postings, &postings_data).await?;
642 dir.write(&files.store, &store_data).await?;
643
644 let meta = SegmentMeta {
645 id: segment_id.0,
646 num_docs: self.next_doc_id,
647 field_stats: self.field_stats.clone(),
648 };
649
650 dir.write(&files.meta, &meta.serialize()?).await?;
651
652 let _ = std::fs::remove_file(&self.store_path);
654 let _ = std::fs::remove_file(&self.spill_path);
655
656 Ok(meta)
657 }
658
659 fn build_postings(&mut self) -> Result<(Vec<u8>, Vec<u8>)> {
661 use std::collections::BTreeMap;
662
663 let mut sorted_terms: BTreeMap<Vec<u8>, &SpillablePostingList> = BTreeMap::new();
666
667 for (term_key, posting_list) in &self.inverted_index {
668 let term_str = self.term_interner.resolve(&term_key.term);
669 let mut key = Vec::with_capacity(4 + term_str.len());
670 key.extend_from_slice(&term_key.field.to_le_bytes());
671 key.extend_from_slice(term_str.as_bytes());
672 sorted_terms.insert(key, posting_list);
673 }
674
675 let mut term_dict = Vec::new();
676 let mut postings = Vec::new();
677 let mut writer = SSTableWriter::<TermInfo>::new(&mut term_dict);
678
679 let spill_mmap = if self.spill_file.is_some() {
681 drop(self.spill_file.take()); let file = File::open(&self.spill_path)?;
683 Some(unsafe { memmap2::Mmap::map(&file)? })
684 } else {
685 None
686 };
687
688 for (key, spill_posting) in sorted_terms {
689 let mut full_postings = PostingList::with_capacity(spill_posting.total_count());
691
692 if spill_posting.spill_offset >= 0
694 && let Some(ref mmap) = spill_mmap
695 {
696 let mut offset = spill_posting.spill_offset as usize;
697 for _ in 0..spill_posting.spill_count {
698 let doc_id = u32::from_le_bytes([
699 mmap[offset],
700 mmap[offset + 1],
701 mmap[offset + 2],
702 mmap[offset + 3],
703 ]);
704 let term_freq = u16::from_le_bytes([mmap[offset + 4], mmap[offset + 5]]);
705 full_postings.push(doc_id, term_freq as u32);
706 offset += 6;
707 }
708 }
709
710 for p in &spill_posting.memory {
712 full_postings.push(p.doc_id, p.term_freq as u32);
713 }
714
715 let doc_ids: Vec<u32> = full_postings.iter().map(|p| p.doc_id).collect();
717 let term_freqs: Vec<u32> = full_postings.iter().map(|p| p.term_freq).collect();
718
719 let term_info = if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
720 inline
721 } else {
722 let posting_offset = postings.len() as u64;
723 let block_list =
724 crate::structures::BlockPostingList::from_posting_list(&full_postings)?;
725 block_list.serialize(&mut postings)?;
726 TermInfo::external(
727 posting_offset,
728 (postings.len() as u64 - posting_offset) as u32,
729 full_postings.doc_count(),
730 )
731 };
732
733 writer.insert(&key, &term_info)?;
734 }
735
736 writer.finish()?;
737 Ok((term_dict, postings))
738 }
739
740 fn build_store_from_stream(&mut self) -> Result<Vec<u8>> {
742 drop(std::mem::replace(
744 &mut self.store_file,
745 BufWriter::new(File::create("/dev/null")?),
746 ));
747
748 let file = File::open(&self.store_path)?;
749 let mmap = unsafe { memmap2::Mmap::map(&file)? };
750
751 let mut store_data = Vec::new();
753 let mut store_writer =
754 StoreWriter::with_compression_level(&mut store_data, self.config.compression_level);
755
756 let mut offset = 0usize;
757 while offset < mmap.len() {
758 if offset + 4 > mmap.len() {
759 break;
760 }
761
762 let doc_len = u32::from_le_bytes([
763 mmap[offset],
764 mmap[offset + 1],
765 mmap[offset + 2],
766 mmap[offset + 3],
767 ]) as usize;
768 offset += 4;
769
770 if offset + doc_len > mmap.len() {
771 break;
772 }
773
774 let doc_bytes = &mmap[offset..offset + doc_len];
775 offset += doc_len;
776
777 if let Ok(doc) = super::store::deserialize_document(doc_bytes, &self.schema) {
779 store_writer.store(&doc, &self.schema)?;
780 }
781 }
782
783 store_writer.finish()?;
784 Ok(store_data)
785 }
786}
787
788#[cfg(feature = "native")]
789impl Drop for SegmentBuilder {
790 fn drop(&mut self) {
791 let _ = std::fs::remove_file(&self.store_path);
793 let _ = std::fs::remove_file(&self.spill_path);
794 }
795}