1#[cfg(feature = "native")]
12use std::fs::{File, OpenOptions};
13#[cfg(feature = "native")]
14use std::io::{BufWriter, Write};
15#[cfg(feature = "native")]
16use std::path::PathBuf;
17#[cfg(feature = "native")]
18use std::sync::Arc;
19
20#[cfg(feature = "native")]
21use hashbrown::HashMap;
22#[cfg(feature = "native")]
23use lasso::{Rodeo, Spur};
24#[cfg(feature = "native")]
25use rustc_hash::FxHashMap;
26
27#[cfg(feature = "native")]
28use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
29#[cfg(feature = "native")]
30use crate::compression::CompressionLevel;
31#[cfg(feature = "native")]
32use crate::directories::{Directory, DirectoryWriter};
33#[cfg(feature = "native")]
34use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
35#[cfg(feature = "native")]
36use crate::structures::{PostingList, SSTableWriter, TermInfo};
37#[cfg(feature = "native")]
38use crate::tokenizer::BoxedTokenizer;
39#[cfg(feature = "native")]
40use crate::wand::WandData;
41#[cfg(feature = "native")]
42use crate::{DocId, Result};
43
44#[cfg(feature = "native")]
46const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; #[cfg(feature = "native")]
51const NUM_INDEX_SHARDS: usize = 64;
52
53#[cfg(feature = "native")]
55#[derive(Clone, Copy, PartialEq, Eq, Hash)]
56struct TermKey {
57 field: u32,
58 term: Spur,
59}
60
61#[cfg(feature = "native")]
62impl TermKey {
63 #[inline]
65 fn shard(&self) -> usize {
66 (self.term.into_inner().get() as usize) & (NUM_INDEX_SHARDS - 1)
69 }
70}
71
72#[cfg(feature = "native")]
75struct ShardedInvertedIndex {
76 shards: Vec<HashMap<TermKey, PostingListBuilder>>,
77}
78
79#[cfg(feature = "native")]
80impl ShardedInvertedIndex {
81 fn new(capacity_per_shard: usize) -> Self {
82 let mut shards = Vec::with_capacity(NUM_INDEX_SHARDS);
83 for _ in 0..NUM_INDEX_SHARDS {
84 shards.push(HashMap::with_capacity(capacity_per_shard));
85 }
86 Self { shards }
87 }
88
89 #[inline]
91 fn get_or_insert(&mut self, key: TermKey) -> &mut PostingListBuilder {
92 self.shards[key.shard()]
93 .entry(key)
94 .or_insert_with(PostingListBuilder::new)
95 }
96
97 fn len(&self) -> usize {
98 self.shards.iter().map(|s| s.len()).sum()
99 }
100
101 fn total_postings_in_memory(&self) -> usize {
103 self.shards
104 .iter()
105 .flat_map(|s| s.values())
106 .map(|p| p.postings.len())
107 .sum()
108 }
109
110 fn shard_stats(&self) -> (usize, usize, usize) {
112 let sizes: Vec<usize> = self.shards.iter().map(|s| s.len()).collect();
113 let min = *sizes.iter().min().unwrap_or(&0);
114 let max = *sizes.iter().max().unwrap_or(&0);
115 let avg = if sizes.is_empty() {
116 0
117 } else {
118 sizes.iter().sum::<usize>() / sizes.len()
119 };
120 (min, max, avg)
121 }
122}
123
124#[cfg(feature = "native")]
126struct ShardedIndexIter<'a> {
127 shards: std::slice::Iter<'a, HashMap<TermKey, PostingListBuilder>>,
128 current: Option<hashbrown::hash_map::Iter<'a, TermKey, PostingListBuilder>>,
129}
130
131#[cfg(feature = "native")]
132impl<'a> Iterator for ShardedIndexIter<'a> {
133 type Item = (&'a TermKey, &'a PostingListBuilder);
134
135 fn next(&mut self) -> Option<Self::Item> {
136 loop {
137 if let Some(ref mut current) = self.current
138 && let Some(item) = current.next()
139 {
140 return Some(item);
141 }
142 match self.shards.next() {
144 Some(shard) => self.current = Some(shard.iter()),
145 None => return None,
146 }
147 }
148 }
149}
150
151#[cfg(feature = "native")]
152impl<'a> IntoIterator for &'a ShardedInvertedIndex {
153 type Item = (&'a TermKey, &'a PostingListBuilder);
154 type IntoIter = ShardedIndexIter<'a>;
155
156 fn into_iter(self) -> Self::IntoIter {
157 ShardedIndexIter {
158 shards: self.shards.iter(),
159 current: None,
160 }
161 }
162}
163
164#[cfg(feature = "native")]
166#[derive(Clone, Copy)]
167struct CompactPosting {
168 doc_id: DocId,
169 term_freq: u16,
170}
171
172#[cfg(feature = "native")]
174struct PostingListBuilder {
175 postings: Vec<CompactPosting>,
177}
178
179#[cfg(feature = "native")]
180impl PostingListBuilder {
181 fn new() -> Self {
182 Self {
183 postings: Vec::new(),
184 }
185 }
186
187 #[inline]
189 fn add(&mut self, doc_id: DocId, term_freq: u32) {
190 if let Some(last) = self.postings.last_mut()
192 && last.doc_id == doc_id
193 {
194 last.term_freq = last.term_freq.saturating_add(term_freq as u16);
195 return;
196 }
197 self.postings.push(CompactPosting {
198 doc_id,
199 term_freq: term_freq.min(u16::MAX as u32) as u16,
200 });
201 }
202
203 fn len(&self) -> usize {
204 self.postings.len()
205 }
206}
207
208#[cfg(feature = "native")]
210#[derive(Debug, Clone)]
211pub struct SegmentBuilderStats {
212 pub num_docs: u32,
214 pub unique_terms: usize,
216 pub postings_in_memory: usize,
218 pub interned_strings: usize,
220 pub doc_field_lengths_size: usize,
222 pub shard_min: usize,
224 pub shard_max: usize,
225 pub shard_avg: usize,
226}
227
228#[cfg(feature = "native")]
230#[derive(Clone)]
231pub struct SegmentBuilderConfig {
232 pub temp_dir: PathBuf,
234 pub compression_level: CompressionLevel,
236 pub num_compression_threads: usize,
238 pub interner_capacity: usize,
240 pub posting_map_capacity: usize,
242}
243
244#[cfg(feature = "native")]
245impl Default for SegmentBuilderConfig {
246 fn default() -> Self {
247 Self {
248 temp_dir: std::env::temp_dir(),
249 compression_level: CompressionLevel(7),
250 num_compression_threads: num_cpus::get(),
251 interner_capacity: 1_000_000,
252 posting_map_capacity: 500_000,
253 }
254 }
255}
256
257#[cfg(feature = "native")]
265pub struct SegmentBuilder {
266 schema: Schema,
267 config: SegmentBuilderConfig,
268 tokenizers: FxHashMap<Field, BoxedTokenizer>,
269
270 term_interner: Rodeo,
272
273 inverted_index: ShardedInvertedIndex,
276
277 store_file: BufWriter<File>,
279 store_path: PathBuf,
280
281 next_doc_id: DocId,
283
284 field_stats: FxHashMap<u32, FieldStats>,
286
287 doc_field_lengths: Vec<u32>,
291 num_indexed_fields: usize,
292 field_to_slot: FxHashMap<u32, usize>,
293
294 wand_data: Option<Arc<WandData>>,
296
297 local_tf_buffer: FxHashMap<Spur, u32>,
300
301 token_buffer: String,
303}
304
305#[cfg(feature = "native")]
306impl SegmentBuilder {
307 pub fn new(schema: Schema, config: SegmentBuilderConfig) -> Result<Self> {
309 let segment_id = uuid::Uuid::new_v4();
310 let store_path = config
311 .temp_dir
312 .join(format!("hermes_store_{}.tmp", segment_id));
313
314 let store_file = BufWriter::with_capacity(
315 STORE_BUFFER_SIZE,
316 OpenOptions::new()
317 .create(true)
318 .write(true)
319 .truncate(true)
320 .open(&store_path)?,
321 );
322
323 let mut num_indexed_fields = 0;
325 let mut field_to_slot = FxHashMap::default();
326 for (field, entry) in schema.fields() {
327 if entry.indexed && matches!(entry.field_type, FieldType::Text) {
328 field_to_slot.insert(field.0, num_indexed_fields);
329 num_indexed_fields += 1;
330 }
331 }
332
333 Ok(Self {
334 schema,
335 tokenizers: FxHashMap::default(),
336 term_interner: Rodeo::new(),
337 inverted_index: ShardedInvertedIndex::new(
338 config.posting_map_capacity / NUM_INDEX_SHARDS,
339 ),
340 store_file,
341 store_path,
342 next_doc_id: 0,
343 field_stats: FxHashMap::default(),
344 doc_field_lengths: Vec::new(),
345 num_indexed_fields,
346 field_to_slot,
347 wand_data: None,
348 local_tf_buffer: FxHashMap::default(),
349 token_buffer: String::with_capacity(64),
350 config,
351 })
352 }
353
354 pub fn with_wand_data(
356 schema: Schema,
357 config: SegmentBuilderConfig,
358 wand_data: Arc<WandData>,
359 ) -> Result<Self> {
360 let mut builder = Self::new(schema, config)?;
361 builder.wand_data = Some(wand_data);
362 Ok(builder)
363 }
364
365 pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
366 self.tokenizers.insert(field, tokenizer);
367 }
368
369 pub fn num_docs(&self) -> u32 {
370 self.next_doc_id
371 }
372
373 pub fn stats(&self) -> SegmentBuilderStats {
375 let (shard_min, shard_max, shard_avg) = self.inverted_index.shard_stats();
376 SegmentBuilderStats {
377 num_docs: self.next_doc_id,
378 unique_terms: self.inverted_index.len(),
379 postings_in_memory: self.inverted_index.total_postings_in_memory(),
380 interned_strings: self.term_interner.len(),
381 doc_field_lengths_size: self.doc_field_lengths.len(),
382 shard_min,
383 shard_max,
384 shard_avg,
385 }
386 }
387
388 pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
390 let doc_id = self.next_doc_id;
391 self.next_doc_id += 1;
392
393 let base_idx = self.doc_field_lengths.len();
395 self.doc_field_lengths
396 .resize(base_idx + self.num_indexed_fields, 0);
397
398 for (field, value) in doc.field_values() {
399 let entry = self.schema.get_field_entry(*field);
400 if entry.is_none() || !entry.unwrap().indexed {
401 continue;
402 }
403
404 let entry = entry.unwrap();
405 match (&entry.field_type, value) {
406 (FieldType::Text, FieldValue::Text(text)) => {
407 let token_count = self.index_text_field(*field, doc_id, text)?;
408
409 let stats = self.field_stats.entry(field.0).or_default();
411 stats.total_tokens += token_count as u64;
412 stats.doc_count += 1;
413
414 if let Some(&slot) = self.field_to_slot.get(&field.0) {
416 self.doc_field_lengths[base_idx + slot] = token_count;
417 }
418 }
419 (FieldType::U64, FieldValue::U64(v)) => {
420 self.index_numeric_field(*field, doc_id, *v)?;
421 }
422 (FieldType::I64, FieldValue::I64(v)) => {
423 self.index_numeric_field(*field, doc_id, *v as u64)?;
424 }
425 (FieldType::F64, FieldValue::F64(v)) => {
426 self.index_numeric_field(*field, doc_id, v.to_bits())?;
427 }
428 _ => {}
429 }
430 }
431
432 self.write_document_to_store(&doc)?;
434
435 Ok(doc_id)
436 }
437
438 fn index_text_field(&mut self, field: Field, doc_id: DocId, text: &str) -> Result<u32> {
446 self.local_tf_buffer.clear();
449
450 let mut token_count = 0u32;
451
452 for word in text.split_whitespace() {
454 self.token_buffer.clear();
456 for c in word.chars() {
457 if c.is_alphanumeric() {
458 for lc in c.to_lowercase() {
459 self.token_buffer.push(lc);
460 }
461 }
462 }
463
464 if self.token_buffer.is_empty() {
465 continue;
466 }
467
468 token_count += 1;
469
470 let term_spur = self.term_interner.get_or_intern(&self.token_buffer);
472 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
473 }
474
475 let field_id = field.0;
478
479 for (&term_spur, &tf) in &self.local_tf_buffer {
480 let term_key = TermKey {
481 field: field_id,
482 term: term_spur,
483 };
484
485 let posting = self.inverted_index.get_or_insert(term_key);
486 posting.add(doc_id, tf);
487 }
488
489 Ok(token_count)
490 }
491
492 fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
493 let term_str = format!("__num_{}", value);
495 let term_spur = self.term_interner.get_or_intern(&term_str);
496
497 let term_key = TermKey {
498 field: field.0,
499 term: term_spur,
500 };
501
502 let posting = self.inverted_index.get_or_insert(term_key);
503 posting.add(doc_id, 1);
504
505 Ok(())
506 }
507
508 fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
510 use byteorder::{LittleEndian, WriteBytesExt};
511
512 let doc_bytes = super::store::serialize_document(doc, &self.schema)?;
513
514 self.store_file
515 .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
516 self.store_file.write_all(&doc_bytes)?;
517
518 Ok(())
519 }
520
521 pub async fn build<D: Directory + DirectoryWriter>(
523 mut self,
524 dir: &D,
525 segment_id: SegmentId,
526 ) -> Result<SegmentMeta> {
527 self.store_file.flush()?;
529
530 let files = SegmentFiles::new(segment_id.0);
531
532 let (term_dict_data, postings_data) = self.build_postings()?;
534
535 let store_data = self.build_store_from_stream()?;
537
538 dir.write(&files.term_dict, &term_dict_data).await?;
540 dir.write(&files.postings, &postings_data).await?;
541 dir.write(&files.store, &store_data).await?;
542
543 let meta = SegmentMeta {
544 id: segment_id.0,
545 num_docs: self.next_doc_id,
546 field_stats: self.field_stats.clone(),
547 };
548
549 dir.write(&files.meta, &meta.serialize()?).await?;
550
551 let _ = std::fs::remove_file(&self.store_path);
553
554 Ok(meta)
555 }
556
557 fn build_postings(&mut self) -> Result<(Vec<u8>, Vec<u8>)> {
559 use std::collections::BTreeMap;
560
561 let mut sorted_terms: BTreeMap<Vec<u8>, &PostingListBuilder> = BTreeMap::new();
564
565 for (term_key, posting_list) in &self.inverted_index {
566 let term_str = self.term_interner.resolve(&term_key.term);
567 let mut key = Vec::with_capacity(4 + term_str.len());
568 key.extend_from_slice(&term_key.field.to_le_bytes());
569 key.extend_from_slice(term_str.as_bytes());
570 sorted_terms.insert(key, posting_list);
571 }
572
573 let mut term_dict = Vec::new();
574 let mut postings = Vec::new();
575 let mut writer = SSTableWriter::<TermInfo>::new(&mut term_dict);
576
577 for (key, posting_builder) in sorted_terms {
578 let mut full_postings = PostingList::with_capacity(posting_builder.len());
580 for p in &posting_builder.postings {
581 full_postings.push(p.doc_id, p.term_freq as u32);
582 }
583
584 let doc_ids: Vec<u32> = full_postings.iter().map(|p| p.doc_id).collect();
586 let term_freqs: Vec<u32> = full_postings.iter().map(|p| p.term_freq).collect();
587
588 let term_info = if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
589 inline
590 } else {
591 let posting_offset = postings.len() as u64;
592 let block_list =
593 crate::structures::BlockPostingList::from_posting_list(&full_postings)?;
594 block_list.serialize(&mut postings)?;
595 TermInfo::external(
596 posting_offset,
597 (postings.len() as u64 - posting_offset) as u32,
598 full_postings.doc_count(),
599 )
600 };
601
602 writer.insert(&key, &term_info)?;
603 }
604
605 writer.finish()?;
606 Ok((term_dict, postings))
607 }
608
609 fn build_store_from_stream(&mut self) -> Result<Vec<u8>> {
611 use super::store::EagerParallelStoreWriter;
612
613 drop(std::mem::replace(
615 &mut self.store_file,
616 BufWriter::new(File::create("/dev/null")?),
617 ));
618
619 let file = File::open(&self.store_path)?;
620 let mmap = unsafe { memmap2::Mmap::map(&file)? };
621
622 let mut store_data = Vec::new();
624 let mut store_writer = EagerParallelStoreWriter::with_compression_level(
625 &mut store_data,
626 self.config.num_compression_threads,
627 self.config.compression_level,
628 );
629
630 let mut offset = 0usize;
631 while offset < mmap.len() {
632 if offset + 4 > mmap.len() {
633 break;
634 }
635
636 let doc_len = u32::from_le_bytes([
637 mmap[offset],
638 mmap[offset + 1],
639 mmap[offset + 2],
640 mmap[offset + 3],
641 ]) as usize;
642 offset += 4;
643
644 if offset + doc_len > mmap.len() {
645 break;
646 }
647
648 let doc_bytes = &mmap[offset..offset + doc_len];
649 offset += doc_len;
650
651 if let Ok(doc) = super::store::deserialize_document(doc_bytes, &self.schema) {
653 store_writer.store(&doc, &self.schema)?;
654 }
655 }
656
657 store_writer.finish()?;
658 Ok(store_data)
659 }
660}
661
662#[cfg(feature = "native")]
663impl Drop for SegmentBuilder {
664 fn drop(&mut self) {
665 let _ = std::fs::remove_file(&self.store_path);
667 }
668}