1#[cfg(feature = "native")]
12use std::fs::{File, OpenOptions};
13#[cfg(feature = "native")]
14use std::io::{BufWriter, Write};
15#[cfg(feature = "native")]
16use std::path::PathBuf;
17#[cfg(feature = "native")]
18use std::sync::Arc;
19
20#[cfg(feature = "native")]
21use hashbrown::HashMap;
22#[cfg(feature = "native")]
23use lasso::{Rodeo, Spur};
24#[cfg(feature = "native")]
25use rustc_hash::FxHashMap;
26
27#[cfg(feature = "native")]
28use super::store::StoreWriter;
29#[cfg(feature = "native")]
30use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
31#[cfg(feature = "native")]
32use crate::compression::CompressionLevel;
33#[cfg(feature = "native")]
34use crate::directories::{Directory, DirectoryWriter};
35#[cfg(feature = "native")]
36use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
37#[cfg(feature = "native")]
38use crate::structures::{PostingList, SSTableWriter, TermInfo};
39#[cfg(feature = "native")]
40use crate::tokenizer::{BoxedTokenizer, LowercaseTokenizer};
41#[cfg(feature = "native")]
42use crate::wand::WandData;
43#[cfg(feature = "native")]
44use crate::{DocId, Result};
45
46#[cfg(feature = "native")]
49const POSTING_FLUSH_THRESHOLD: usize = 1_000;
50
51#[cfg(feature = "native")]
53const SPILL_BUFFER_SIZE: usize = 16 * 1024 * 1024; #[cfg(feature = "native")]
58const NUM_INDEX_SHARDS: usize = 64;
59
60#[cfg(feature = "native")]
62#[derive(Clone, Copy, PartialEq, Eq, Hash)]
63struct TermKey {
64 field: u32,
65 term: Spur,
66}
67
68#[cfg(feature = "native")]
69impl TermKey {
70 #[inline]
72 fn shard(&self) -> usize {
73 (self.term.into_inner().get() as usize) & (NUM_INDEX_SHARDS - 1)
76 }
77}
78
79#[cfg(feature = "native")]
82struct ShardedInvertedIndex {
83 shards: Vec<HashMap<TermKey, SpillablePostingList>>,
84}
85
86#[cfg(feature = "native")]
87impl ShardedInvertedIndex {
88 fn new(capacity_per_shard: usize) -> Self {
89 let mut shards = Vec::with_capacity(NUM_INDEX_SHARDS);
90 for _ in 0..NUM_INDEX_SHARDS {
91 shards.push(HashMap::with_capacity(capacity_per_shard));
92 }
93 Self { shards }
94 }
95
96 #[inline]
97 fn get_mut(&mut self, key: &TermKey) -> Option<&mut SpillablePostingList> {
98 self.shards[key.shard()].get_mut(key)
99 }
100
101 #[inline]
103 fn get_or_insert(&mut self, key: TermKey) -> &mut SpillablePostingList {
104 self.shards[key.shard()]
105 .entry(key)
106 .or_insert_with(SpillablePostingList::new)
107 }
108
109 fn len(&self) -> usize {
110 self.shards.iter().map(|s| s.len()).sum()
111 }
112
113 fn total_postings_in_memory(&self) -> usize {
115 self.shards
116 .iter()
117 .flat_map(|s| s.values())
118 .map(|p| p.memory.len())
119 .sum()
120 }
121
122 fn shard_stats(&self) -> (usize, usize, usize) {
124 let sizes: Vec<usize> = self.shards.iter().map(|s| s.len()).collect();
125 let min = *sizes.iter().min().unwrap_or(&0);
126 let max = *sizes.iter().max().unwrap_or(&0);
127 let avg = if sizes.is_empty() {
128 0
129 } else {
130 sizes.iter().sum::<usize>() / sizes.len()
131 };
132 (min, max, avg)
133 }
134}
135
136#[cfg(feature = "native")]
138struct ShardedIndexIter<'a> {
139 shards: std::slice::Iter<'a, HashMap<TermKey, SpillablePostingList>>,
140 current: Option<hashbrown::hash_map::Iter<'a, TermKey, SpillablePostingList>>,
141}
142
143#[cfg(feature = "native")]
144impl<'a> Iterator for ShardedIndexIter<'a> {
145 type Item = (&'a TermKey, &'a SpillablePostingList);
146
147 fn next(&mut self) -> Option<Self::Item> {
148 loop {
149 if let Some(ref mut current) = self.current
150 && let Some(item) = current.next()
151 {
152 return Some(item);
153 }
154 match self.shards.next() {
156 Some(shard) => self.current = Some(shard.iter()),
157 None => return None,
158 }
159 }
160 }
161}
162
163#[cfg(feature = "native")]
164impl<'a> IntoIterator for &'a ShardedInvertedIndex {
165 type Item = (&'a TermKey, &'a SpillablePostingList);
166 type IntoIter = ShardedIndexIter<'a>;
167
168 fn into_iter(self) -> Self::IntoIter {
169 ShardedIndexIter {
170 shards: self.shards.iter(),
171 current: None,
172 }
173 }
174}
175
176#[cfg(feature = "native")]
178#[derive(Clone, Copy)]
179struct CompactPosting {
180 doc_id: DocId,
181 term_freq: u16, }
183
184#[cfg(feature = "native")]
186struct SpillablePostingList {
187 memory: Vec<CompactPosting>,
189 spill_offset: i64,
191 spill_count: u32,
193}
194
195#[cfg(feature = "native")]
196impl SpillablePostingList {
197 fn new() -> Self {
198 Self {
199 memory: Vec::new(),
200 spill_offset: -1,
201 spill_count: 0,
202 }
203 }
204
205 #[allow(dead_code)]
206 fn with_capacity(capacity: usize) -> Self {
207 Self {
208 memory: Vec::with_capacity(capacity),
209 spill_offset: -1,
210 spill_count: 0,
211 }
212 }
213
214 #[inline]
215 fn add(&mut self, doc_id: DocId, term_freq: u32) {
216 if let Some(last) = self.memory.last_mut()
218 && last.doc_id == doc_id
219 {
220 last.term_freq = last.term_freq.saturating_add(term_freq as u16);
221 return;
222 }
223 self.memory.push(CompactPosting {
224 doc_id,
225 term_freq: term_freq.min(u16::MAX as u32) as u16,
226 });
227 }
228
229 fn total_count(&self) -> usize {
230 self.memory.len() + self.spill_count as usize
231 }
232
233 fn needs_spill(&self) -> bool {
234 self.memory.len() >= POSTING_FLUSH_THRESHOLD
235 }
236}
237
238#[cfg(feature = "native")]
240#[derive(Debug, Clone)]
241pub struct SegmentBuilderStats {
242 pub num_docs: u32,
244 pub unique_terms: usize,
246 pub postings_in_memory: usize,
248 pub interned_strings: usize,
250 pub doc_field_lengths_size: usize,
252 pub shard_min: usize,
254 pub shard_max: usize,
255 pub shard_avg: usize,
256 pub spill_bytes: u64,
258}
259
260#[cfg(feature = "native")]
262#[derive(Clone)]
263pub struct SegmentBuilderConfig {
264 pub temp_dir: PathBuf,
266 pub compression_level: CompressionLevel,
268 pub num_compression_threads: usize,
270 pub interner_capacity: usize,
272 pub posting_map_capacity: usize,
274}
275
276#[cfg(feature = "native")]
277impl Default for SegmentBuilderConfig {
278 fn default() -> Self {
279 Self {
280 temp_dir: std::env::temp_dir(),
281 compression_level: CompressionLevel(7),
282 num_compression_threads: num_cpus::get(),
283 interner_capacity: 1_000_000,
284 posting_map_capacity: 500_000,
285 }
286 }
287}
288
289#[cfg(feature = "native")]
297pub struct SegmentBuilder {
298 schema: Schema,
299 config: SegmentBuilderConfig,
300 tokenizers: FxHashMap<Field, BoxedTokenizer>,
301
302 term_interner: Rodeo,
304
305 inverted_index: ShardedInvertedIndex,
308
309 store_file: BufWriter<File>,
311 store_path: PathBuf,
312
313 spill_file: Option<BufWriter<File>>,
315 spill_path: PathBuf,
316 spill_offset: u64,
317
318 next_doc_id: DocId,
320
321 field_stats: FxHashMap<u32, FieldStats>,
323
324 doc_field_lengths: Vec<u32>,
328 num_indexed_fields: usize,
329 field_to_slot: FxHashMap<u32, usize>,
330
331 wand_data: Option<Arc<WandData>>,
333
334 local_tf_buffer: FxHashMap<Spur, u32>,
337
338 terms_to_spill_buffer: Vec<TermKey>,
340}
341
342#[cfg(feature = "native")]
343impl SegmentBuilder {
344 pub fn new(schema: Schema, config: SegmentBuilderConfig) -> Result<Self> {
346 let segment_id = uuid::Uuid::new_v4();
347 let store_path = config
348 .temp_dir
349 .join(format!("hermes_store_{}.tmp", segment_id));
350 let spill_path = config
351 .temp_dir
352 .join(format!("hermes_spill_{}.tmp", segment_id));
353
354 let store_file = BufWriter::with_capacity(
355 SPILL_BUFFER_SIZE,
356 OpenOptions::new()
357 .create(true)
358 .write(true)
359 .truncate(true)
360 .open(&store_path)?,
361 );
362
363 let mut num_indexed_fields = 0;
365 let mut field_to_slot = FxHashMap::default();
366 for (field, entry) in schema.fields() {
367 if entry.indexed && matches!(entry.field_type, FieldType::Text) {
368 field_to_slot.insert(field.0, num_indexed_fields);
369 num_indexed_fields += 1;
370 }
371 }
372
373 Ok(Self {
374 schema,
375 tokenizers: FxHashMap::default(),
376 term_interner: Rodeo::new(),
377 inverted_index: ShardedInvertedIndex::new(
378 config.posting_map_capacity / NUM_INDEX_SHARDS,
379 ),
380 store_file,
381 store_path,
382 spill_file: None,
383 spill_path,
384 spill_offset: 0,
385 next_doc_id: 0,
386 field_stats: FxHashMap::default(),
387 doc_field_lengths: Vec::new(),
388 num_indexed_fields,
389 field_to_slot,
390 wand_data: None,
391 local_tf_buffer: FxHashMap::default(),
392 terms_to_spill_buffer: Vec::new(),
393 config,
394 })
395 }
396
397 pub fn with_wand_data(
399 schema: Schema,
400 config: SegmentBuilderConfig,
401 wand_data: Arc<WandData>,
402 ) -> Result<Self> {
403 let mut builder = Self::new(schema, config)?;
404 builder.wand_data = Some(wand_data);
405 Ok(builder)
406 }
407
408 pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
409 self.tokenizers.insert(field, tokenizer);
410 }
411
412 pub fn num_docs(&self) -> u32 {
413 self.next_doc_id
414 }
415
416 pub fn stats(&self) -> SegmentBuilderStats {
418 let (shard_min, shard_max, shard_avg) = self.inverted_index.shard_stats();
419 SegmentBuilderStats {
420 num_docs: self.next_doc_id,
421 unique_terms: self.inverted_index.len(),
422 postings_in_memory: self.inverted_index.total_postings_in_memory(),
423 interned_strings: self.term_interner.len(),
424 doc_field_lengths_size: self.doc_field_lengths.len(),
425 shard_min,
426 shard_max,
427 shard_avg,
428 spill_bytes: self.spill_offset,
429 }
430 }
431
432 pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
434 let doc_id = self.next_doc_id;
435 self.next_doc_id += 1;
436
437 let base_idx = self.doc_field_lengths.len();
439 self.doc_field_lengths
440 .resize(base_idx + self.num_indexed_fields, 0);
441
442 for (field, value) in doc.field_values() {
443 let entry = self.schema.get_field_entry(*field);
444 if entry.is_none() || !entry.unwrap().indexed {
445 continue;
446 }
447
448 let entry = entry.unwrap();
449 match (&entry.field_type, value) {
450 (FieldType::Text, FieldValue::Text(text)) => {
451 let token_count = self.index_text_field(*field, doc_id, text)?;
452
453 let stats = self.field_stats.entry(field.0).or_default();
455 stats.total_tokens += token_count as u64;
456 stats.doc_count += 1;
457
458 if let Some(&slot) = self.field_to_slot.get(&field.0) {
460 self.doc_field_lengths[base_idx + slot] = token_count;
461 }
462 }
463 (FieldType::U64, FieldValue::U64(v)) => {
464 self.index_numeric_field(*field, doc_id, *v)?;
465 }
466 (FieldType::I64, FieldValue::I64(v)) => {
467 self.index_numeric_field(*field, doc_id, *v as u64)?;
468 }
469 (FieldType::F64, FieldValue::F64(v)) => {
470 self.index_numeric_field(*field, doc_id, v.to_bits())?;
471 }
472 _ => {}
473 }
474 }
475
476 self.write_document_to_store(&doc)?;
478
479 Ok(doc_id)
480 }
481
482 fn index_text_field(&mut self, field: Field, doc_id: DocId, text: &str) -> Result<u32> {
488 let default_tokenizer = LowercaseTokenizer;
489 let tokenizer: &dyn crate::tokenizer::TokenizerClone = self
490 .tokenizers
491 .get(&field)
492 .map(|t| t.as_ref())
493 .unwrap_or(&default_tokenizer);
494
495 let tokens = tokenizer.tokenize(text);
496 let token_count = tokens.len() as u32;
497
498 self.local_tf_buffer.clear();
501
502 for token in tokens {
503 let term_spur = self.term_interner.get_or_intern(&token.text);
505 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
506 }
507
508 let field_id = field.0;
512 self.terms_to_spill_buffer.clear();
513
514 for (&term_spur, &tf) in &self.local_tf_buffer {
515 let term_key = TermKey {
516 field: field_id,
517 term: term_spur,
518 };
519
520 let posting = self.inverted_index.get_or_insert(term_key);
521 posting.add(doc_id, tf);
522
523 if posting.needs_spill() {
525 self.terms_to_spill_buffer.push(term_key);
526 }
527 }
528
529 for i in 0..self.terms_to_spill_buffer.len() {
531 let term_key = self.terms_to_spill_buffer[i];
532 self.spill_posting_list(term_key)?;
533 }
534
535 Ok(token_count)
536 }
537
538 fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
539 let term_str = format!("__num_{}", value);
541 let term_spur = self.term_interner.get_or_intern(&term_str);
542
543 let term_key = TermKey {
544 field: field.0,
545 term: term_spur,
546 };
547
548 let posting = self.inverted_index.get_or_insert(term_key);
549 posting.add(doc_id, 1);
550
551 Ok(())
552 }
553
554 fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
556 use byteorder::{LittleEndian, WriteBytesExt};
557
558 let doc_bytes = super::store::serialize_document(doc, &self.schema)?;
559
560 self.store_file
561 .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
562 self.store_file.write_all(&doc_bytes)?;
563
564 Ok(())
565 }
566
567 fn spill_posting_list(&mut self, term_key: TermKey) -> Result<()> {
569 use byteorder::{LittleEndian, WriteBytesExt};
570
571 let posting = self.inverted_index.get_mut(&term_key).unwrap();
572
573 if self.spill_file.is_none() {
575 let file = OpenOptions::new()
576 .create(true)
577 .write(true)
578 .read(true)
579 .truncate(true)
580 .open(&self.spill_path)?;
581 self.spill_file = Some(BufWriter::with_capacity(SPILL_BUFFER_SIZE, file));
582 }
583
584 let spill_file = self.spill_file.as_mut().unwrap();
585
586 if posting.spill_offset < 0 {
588 posting.spill_offset = self.spill_offset as i64;
589 }
590
591 for p in &posting.memory {
593 spill_file.write_u32::<LittleEndian>(p.doc_id)?;
594 spill_file.write_u16::<LittleEndian>(p.term_freq)?;
595 self.spill_offset += 6; }
597
598 posting.spill_count += posting.memory.len() as u32;
599 posting.memory.clear();
600 posting.memory.shrink_to(POSTING_FLUSH_THRESHOLD / 4); Ok(())
603 }
604
605 pub async fn build<D: Directory + DirectoryWriter>(
607 mut self,
608 dir: &D,
609 segment_id: SegmentId,
610 ) -> Result<SegmentMeta> {
611 self.store_file.flush()?;
613 if let Some(ref mut spill) = self.spill_file {
614 spill.flush()?;
615 }
616
617 let files = SegmentFiles::new(segment_id.0);
618
619 let (term_dict_data, postings_data) = self.build_postings()?;
621
622 let store_data = self.build_store_from_stream()?;
624
625 dir.write(&files.term_dict, &term_dict_data).await?;
627 dir.write(&files.postings, &postings_data).await?;
628 dir.write(&files.store, &store_data).await?;
629
630 let meta = SegmentMeta {
631 id: segment_id.0,
632 num_docs: self.next_doc_id,
633 field_stats: self.field_stats.clone(),
634 };
635
636 dir.write(&files.meta, &meta.serialize()?).await?;
637
638 let _ = std::fs::remove_file(&self.store_path);
640 let _ = std::fs::remove_file(&self.spill_path);
641
642 Ok(meta)
643 }
644
645 fn build_postings(&mut self) -> Result<(Vec<u8>, Vec<u8>)> {
647 use std::collections::BTreeMap;
648
649 let mut sorted_terms: BTreeMap<Vec<u8>, &SpillablePostingList> = BTreeMap::new();
652
653 for (term_key, posting_list) in &self.inverted_index {
654 let term_str = self.term_interner.resolve(&term_key.term);
655 let mut key = Vec::with_capacity(4 + term_str.len());
656 key.extend_from_slice(&term_key.field.to_le_bytes());
657 key.extend_from_slice(term_str.as_bytes());
658 sorted_terms.insert(key, posting_list);
659 }
660
661 let mut term_dict = Vec::new();
662 let mut postings = Vec::new();
663 let mut writer = SSTableWriter::<TermInfo>::new(&mut term_dict);
664
665 let spill_mmap = if self.spill_file.is_some() {
667 drop(self.spill_file.take()); let file = File::open(&self.spill_path)?;
669 Some(unsafe { memmap2::Mmap::map(&file)? })
670 } else {
671 None
672 };
673
674 for (key, spill_posting) in sorted_terms {
675 let mut full_postings = PostingList::with_capacity(spill_posting.total_count());
677
678 if spill_posting.spill_offset >= 0
680 && let Some(ref mmap) = spill_mmap
681 {
682 let mut offset = spill_posting.spill_offset as usize;
683 for _ in 0..spill_posting.spill_count {
684 let doc_id = u32::from_le_bytes([
685 mmap[offset],
686 mmap[offset + 1],
687 mmap[offset + 2],
688 mmap[offset + 3],
689 ]);
690 let term_freq = u16::from_le_bytes([mmap[offset + 4], mmap[offset + 5]]);
691 full_postings.push(doc_id, term_freq as u32);
692 offset += 6;
693 }
694 }
695
696 for p in &spill_posting.memory {
698 full_postings.push(p.doc_id, p.term_freq as u32);
699 }
700
701 let doc_ids: Vec<u32> = full_postings.iter().map(|p| p.doc_id).collect();
703 let term_freqs: Vec<u32> = full_postings.iter().map(|p| p.term_freq).collect();
704
705 let term_info = if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
706 inline
707 } else {
708 let posting_offset = postings.len() as u64;
709 let block_list =
710 crate::structures::BlockPostingList::from_posting_list(&full_postings)?;
711 block_list.serialize(&mut postings)?;
712 TermInfo::external(
713 posting_offset,
714 (postings.len() as u64 - posting_offset) as u32,
715 full_postings.doc_count(),
716 )
717 };
718
719 writer.insert(&key, &term_info)?;
720 }
721
722 writer.finish()?;
723 Ok((term_dict, postings))
724 }
725
726 fn build_store_from_stream(&mut self) -> Result<Vec<u8>> {
728 drop(std::mem::replace(
730 &mut self.store_file,
731 BufWriter::new(File::create("/dev/null")?),
732 ));
733
734 let file = File::open(&self.store_path)?;
735 let mmap = unsafe { memmap2::Mmap::map(&file)? };
736
737 let mut store_data = Vec::new();
739 let mut store_writer =
740 StoreWriter::with_compression_level(&mut store_data, self.config.compression_level);
741
742 let mut offset = 0usize;
743 while offset < mmap.len() {
744 if offset + 4 > mmap.len() {
745 break;
746 }
747
748 let doc_len = u32::from_le_bytes([
749 mmap[offset],
750 mmap[offset + 1],
751 mmap[offset + 2],
752 mmap[offset + 3],
753 ]) as usize;
754 offset += 4;
755
756 if offset + doc_len > mmap.len() {
757 break;
758 }
759
760 let doc_bytes = &mmap[offset..offset + doc_len];
761 offset += doc_len;
762
763 if let Ok(doc) = super::store::deserialize_document(doc_bytes, &self.schema) {
765 store_writer.store(&doc, &self.schema)?;
766 }
767 }
768
769 store_writer.finish()?;
770 Ok(store_data)
771 }
772}
773
774#[cfg(feature = "native")]
775impl Drop for SegmentBuilder {
776 fn drop(&mut self) {
777 let _ = std::fs::remove_file(&self.store_path);
779 let _ = std::fs::remove_file(&self.spill_path);
780 }
781}