1mod config;
12mod dense_build;
13mod posting;
14mod postings_build;
15mod sparse_build;
16mod store_build;
17mod vectors;
18
19pub use config::{MemoryBreakdown, SegmentBuilderConfig, SegmentBuilderStats};
20
21use std::fs::{File, OpenOptions};
22use std::io::{BufWriter, Write};
23use std::mem::size_of;
24use std::path::PathBuf;
25
26use hashbrown::HashMap;
27use lasso::{Rodeo, Spur};
28use rustc_hash::FxHashMap;
29
30use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
31use std::sync::Arc;
32
33use crate::directories::{Directory, DirectoryWriter};
34use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
35use crate::tokenizer::BoxedTokenizer;
36use crate::{DocId, Result};
37
38use posting::{CompactPosting, PositionPostingListBuilder, PostingListBuilder, TermKey};
39use vectors::{DenseVectorBuilder, SparseVectorBuilder};
40
41const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; const NEW_TERM_OVERHEAD: usize = size_of::<TermKey>() + size_of::<PostingListBuilder>() + 24;
47
48const INTERN_OVERHEAD: usize = size_of::<Spur>() + 2 * size_of::<usize>();
50
51const NEW_POS_TERM_OVERHEAD: usize =
53 size_of::<TermKey>() + size_of::<PositionPostingListBuilder>() + 24;
54
55pub struct SegmentBuilder {
62 schema: Arc<Schema>,
63 config: SegmentBuilderConfig,
64 tokenizers: FxHashMap<Field, BoxedTokenizer>,
65
66 term_interner: Rodeo,
68
69 inverted_index: HashMap<TermKey, PostingListBuilder>,
71
72 store_file: BufWriter<File>,
74 store_path: PathBuf,
75
76 next_doc_id: DocId,
78
79 field_stats: FxHashMap<u32, FieldStats>,
81
82 doc_field_lengths: Vec<u32>,
86 num_indexed_fields: usize,
87 field_to_slot: FxHashMap<u32, usize>,
88
89 local_tf_buffer: FxHashMap<Spur, u32>,
92
93 local_positions: FxHashMap<Spur, Vec<u32>>,
96
97 token_buffer: String,
99
100 numeric_buffer: String,
102
103 dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
106
107 sparse_vectors: FxHashMap<u32, SparseVectorBuilder>,
110
111 position_index: HashMap<TermKey, PositionPostingListBuilder>,
114
115 position_enabled_fields: FxHashMap<u32, Option<crate::dsl::PositionMode>>,
117
118 current_element_ordinal: FxHashMap<u32, u32>,
120
121 estimated_memory: usize,
123
124 doc_serialize_buffer: Vec<u8>,
126}
127
128impl SegmentBuilder {
129 pub fn new(schema: Arc<Schema>, config: SegmentBuilderConfig) -> Result<Self> {
131 let segment_id = uuid::Uuid::new_v4();
132 let store_path = config
133 .temp_dir
134 .join(format!("hermes_store_{}.tmp", segment_id));
135
136 let store_file = BufWriter::with_capacity(
137 STORE_BUFFER_SIZE,
138 OpenOptions::new()
139 .create(true)
140 .write(true)
141 .truncate(true)
142 .open(&store_path)?,
143 );
144
145 let mut num_indexed_fields = 0;
148 let mut field_to_slot = FxHashMap::default();
149 let mut position_enabled_fields = FxHashMap::default();
150 for (field, entry) in schema.fields() {
151 if entry.indexed && matches!(entry.field_type, FieldType::Text) {
152 field_to_slot.insert(field.0, num_indexed_fields);
153 num_indexed_fields += 1;
154 if entry.positions.is_some() {
155 position_enabled_fields.insert(field.0, entry.positions);
156 }
157 }
158 }
159
160 Ok(Self {
161 schema,
162 tokenizers: FxHashMap::default(),
163 term_interner: Rodeo::new(),
164 inverted_index: HashMap::with_capacity(config.posting_map_capacity),
165 store_file,
166 store_path,
167 next_doc_id: 0,
168 field_stats: FxHashMap::default(),
169 doc_field_lengths: Vec::new(),
170 num_indexed_fields,
171 field_to_slot,
172 local_tf_buffer: FxHashMap::default(),
173 local_positions: FxHashMap::default(),
174 token_buffer: String::with_capacity(64),
175 numeric_buffer: String::with_capacity(32),
176 config,
177 dense_vectors: FxHashMap::default(),
178 sparse_vectors: FxHashMap::default(),
179 position_index: HashMap::new(),
180 position_enabled_fields,
181 current_element_ordinal: FxHashMap::default(),
182 estimated_memory: 0,
183 doc_serialize_buffer: Vec::with_capacity(256),
184 })
185 }
186
187 pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
188 self.tokenizers.insert(field, tokenizer);
189 }
190
191 fn next_element_ordinal(&mut self, field_id: u32) -> u32 {
194 let ordinal = *self.current_element_ordinal.get(&field_id).unwrap_or(&0);
195 *self.current_element_ordinal.entry(field_id).or_insert(0) += 1;
196 ordinal
197 }
198
199 pub fn num_docs(&self) -> u32 {
200 self.next_doc_id
201 }
202
203 #[inline]
205 pub fn estimated_memory_bytes(&self) -> usize {
206 self.estimated_memory
207 }
208
209 pub fn sparse_dim_count(&self) -> usize {
211 self.sparse_vectors.values().map(|b| b.postings.len()).sum()
212 }
213
214 pub fn stats(&self) -> SegmentBuilderStats {
216 use std::mem::size_of;
217
218 let postings_in_memory: usize =
219 self.inverted_index.values().map(|p| p.postings.len()).sum();
220
221 let compact_posting_size = size_of::<CompactPosting>();
223 let vec_overhead = size_of::<Vec<u8>>(); let term_key_size = size_of::<TermKey>();
225 let posting_builder_size = size_of::<PostingListBuilder>();
226 let spur_size = size_of::<lasso::Spur>();
227 let sparse_entry_size = size_of::<(DocId, u16, f32)>();
228
229 let hashmap_entry_base_overhead = 8usize;
232
233 let fxhashmap_entry_overhead = hashmap_entry_base_overhead;
235
236 let postings_bytes: usize = self
238 .inverted_index
239 .values()
240 .map(|p| p.postings.capacity() * compact_posting_size + vec_overhead)
241 .sum();
242
243 let index_overhead_bytes = self.inverted_index.len()
245 * (term_key_size + posting_builder_size + hashmap_entry_base_overhead);
246
247 let interner_arena_overhead = 2 * size_of::<usize>();
250 let avg_term_len = 8; let interner_bytes =
252 self.term_interner.len() * (avg_term_len + spur_size + interner_arena_overhead);
253
254 let field_lengths_bytes =
256 self.doc_field_lengths.capacity() * size_of::<u32>() + vec_overhead;
257
258 let mut dense_vectors_bytes: usize = 0;
260 let mut dense_vector_count: usize = 0;
261 let doc_id_ordinal_size = size_of::<(DocId, u16)>();
262 for b in self.dense_vectors.values() {
263 dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
264 + b.doc_ids.capacity() * doc_id_ordinal_size
265 + 2 * vec_overhead; dense_vector_count += b.doc_ids.len();
267 }
268
269 let local_tf_entry_size = spur_size + size_of::<u32>() + fxhashmap_entry_overhead;
271 let local_tf_buffer_bytes = self.local_tf_buffer.capacity() * local_tf_entry_size;
272
273 let mut sparse_vectors_bytes: usize = 0;
275 for builder in self.sparse_vectors.values() {
276 for postings in builder.postings.values() {
277 sparse_vectors_bytes += postings.capacity() * sparse_entry_size + vec_overhead;
278 }
279 let inner_entry_size = size_of::<u32>() + vec_overhead + fxhashmap_entry_overhead;
281 sparse_vectors_bytes += builder.postings.len() * inner_entry_size;
282 }
283 let outer_sparse_entry_size =
285 size_of::<u32>() + size_of::<SparseVectorBuilder>() + fxhashmap_entry_overhead;
286 sparse_vectors_bytes += self.sparse_vectors.len() * outer_sparse_entry_size;
287
288 let mut position_index_bytes: usize = 0;
290 for pos_builder in self.position_index.values() {
291 for (_, positions) in &pos_builder.postings {
292 position_index_bytes += positions.capacity() * size_of::<u32>() + vec_overhead;
293 }
294 let pos_entry_size = size_of::<DocId>() + vec_overhead;
296 position_index_bytes += pos_builder.postings.capacity() * pos_entry_size;
297 }
298 let pos_index_entry_size =
300 term_key_size + size_of::<PositionPostingListBuilder>() + hashmap_entry_base_overhead;
301 position_index_bytes += self.position_index.len() * pos_index_entry_size;
302
303 let estimated_memory_bytes = postings_bytes
304 + index_overhead_bytes
305 + interner_bytes
306 + field_lengths_bytes
307 + dense_vectors_bytes
308 + local_tf_buffer_bytes
309 + sparse_vectors_bytes
310 + position_index_bytes;
311
312 let memory_breakdown = MemoryBreakdown {
313 postings_bytes,
314 index_overhead_bytes,
315 interner_bytes,
316 field_lengths_bytes,
317 dense_vectors_bytes,
318 dense_vector_count,
319 sparse_vectors_bytes,
320 position_index_bytes,
321 };
322
323 SegmentBuilderStats {
324 num_docs: self.next_doc_id,
325 unique_terms: self.inverted_index.len(),
326 postings_in_memory,
327 interned_strings: self.term_interner.len(),
328 doc_field_lengths_size: self.doc_field_lengths.len(),
329 estimated_memory_bytes,
330 memory_breakdown,
331 }
332 }
333
334 pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
336 let doc_id = self.next_doc_id;
337 self.next_doc_id += 1;
338
339 let base_idx = self.doc_field_lengths.len();
341 self.doc_field_lengths
342 .resize(base_idx + self.num_indexed_fields, 0);
343 self.estimated_memory += self.num_indexed_fields * std::mem::size_of::<u32>();
344
345 self.current_element_ordinal.clear();
347
348 for (field, value) in doc.field_values() {
349 let Some(entry) = self.schema.get_field_entry(*field) else {
350 continue;
351 };
352
353 if !matches!(&entry.field_type, FieldType::DenseVector) && !entry.indexed {
356 continue;
357 }
358
359 match (&entry.field_type, value) {
360 (FieldType::Text, FieldValue::Text(text)) => {
361 let element_ordinal = self.next_element_ordinal(field.0);
362 let token_count =
363 self.index_text_field(*field, doc_id, text, element_ordinal)?;
364
365 let stats = self.field_stats.entry(field.0).or_default();
366 stats.total_tokens += token_count as u64;
367 if element_ordinal == 0 {
368 stats.doc_count += 1;
369 }
370
371 if let Some(&slot) = self.field_to_slot.get(&field.0) {
372 self.doc_field_lengths[base_idx + slot] = token_count;
373 }
374 }
375 (FieldType::U64, FieldValue::U64(v)) => {
376 self.index_numeric_field(*field, doc_id, *v)?;
377 }
378 (FieldType::I64, FieldValue::I64(v)) => {
379 self.index_numeric_field(*field, doc_id, *v as u64)?;
380 }
381 (FieldType::F64, FieldValue::F64(v)) => {
382 self.index_numeric_field(*field, doc_id, v.to_bits())?;
383 }
384 (FieldType::DenseVector, FieldValue::DenseVector(vec))
385 if entry.indexed || entry.stored =>
386 {
387 let ordinal = self.next_element_ordinal(field.0);
388 self.index_dense_vector_field(*field, doc_id, ordinal as u16, vec)?;
389 }
390 (FieldType::SparseVector, FieldValue::SparseVector(entries)) => {
391 let ordinal = self.next_element_ordinal(field.0);
392 self.index_sparse_vector_field(*field, doc_id, ordinal as u16, entries)?;
393 }
394 _ => {}
395 }
396 }
397
398 self.write_document_to_store(&doc)?;
400
401 Ok(doc_id)
402 }
403
404 fn index_text_field(
413 &mut self,
414 field: Field,
415 doc_id: DocId,
416 text: &str,
417 element_ordinal: u32,
418 ) -> Result<u32> {
419 use crate::dsl::PositionMode;
420
421 let field_id = field.0;
422 let position_mode = self
423 .position_enabled_fields
424 .get(&field_id)
425 .copied()
426 .flatten();
427
428 self.local_tf_buffer.clear();
432 for v in self.local_positions.values_mut() {
434 v.clear();
435 }
436
437 let mut token_position = 0u32;
438
439 let custom_tokens = self.tokenizers.get(&field).map(|t| t.tokenize(text));
443
444 if let Some(tokens) = custom_tokens {
445 for token in &tokens {
447 let is_new_string = !self.term_interner.contains(&token.text);
448 let term_spur = self.term_interner.get_or_intern(&token.text);
449 if is_new_string {
450 self.estimated_memory += token.text.len() + INTERN_OVERHEAD;
451 }
452 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
453
454 if let Some(mode) = position_mode {
455 let encoded_pos = match mode {
456 PositionMode::Ordinal => element_ordinal << 20,
457 PositionMode::TokenPosition => token.position,
458 PositionMode::Full => (element_ordinal << 20) | token.position,
459 };
460 self.local_positions
461 .entry(term_spur)
462 .or_default()
463 .push(encoded_pos);
464 }
465 }
466 token_position = tokens.len() as u32;
467 } else {
468 for word in text.split_whitespace() {
470 self.token_buffer.clear();
471 for c in word.chars() {
472 if c.is_alphanumeric() {
473 for lc in c.to_lowercase() {
474 self.token_buffer.push(lc);
475 }
476 }
477 }
478
479 if self.token_buffer.is_empty() {
480 continue;
481 }
482
483 let is_new_string = !self.term_interner.contains(&self.token_buffer);
484 let term_spur = self.term_interner.get_or_intern(&self.token_buffer);
485 if is_new_string {
486 self.estimated_memory += self.token_buffer.len() + INTERN_OVERHEAD;
487 }
488 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
489
490 if let Some(mode) = position_mode {
491 let encoded_pos = match mode {
492 PositionMode::Ordinal => element_ordinal << 20,
493 PositionMode::TokenPosition => token_position,
494 PositionMode::Full => (element_ordinal << 20) | token_position,
495 };
496 self.local_positions
497 .entry(term_spur)
498 .or_default()
499 .push(encoded_pos);
500 }
501
502 token_position += 1;
503 }
504 }
505
506 for (&term_spur, &tf) in &self.local_tf_buffer {
509 let term_key = TermKey {
510 field: field_id,
511 term: term_spur,
512 };
513
514 let is_new_term = !self.inverted_index.contains_key(&term_key);
515 let posting = self
516 .inverted_index
517 .entry(term_key)
518 .or_insert_with(PostingListBuilder::new);
519 posting.add(doc_id, tf);
520
521 self.estimated_memory += size_of::<CompactPosting>();
522 if is_new_term {
523 self.estimated_memory += NEW_TERM_OVERHEAD;
524 }
525
526 if position_mode.is_some()
527 && let Some(positions) = self.local_positions.get(&term_spur)
528 {
529 let is_new_pos_term = !self.position_index.contains_key(&term_key);
530 let pos_posting = self
531 .position_index
532 .entry(term_key)
533 .or_insert_with(PositionPostingListBuilder::new);
534 for &pos in positions {
535 pos_posting.add_position(doc_id, pos);
536 }
537 self.estimated_memory += positions.len() * size_of::<u32>();
538 if is_new_pos_term {
539 self.estimated_memory += NEW_POS_TERM_OVERHEAD;
540 }
541 }
542 }
543
544 Ok(token_position)
545 }
546
547 fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
548 use std::fmt::Write;
549
550 self.numeric_buffer.clear();
551 write!(self.numeric_buffer, "__num_{}", value).unwrap();
552 let is_new_string = !self.term_interner.contains(&self.numeric_buffer);
553 let term_spur = self.term_interner.get_or_intern(&self.numeric_buffer);
554
555 let term_key = TermKey {
556 field: field.0,
557 term: term_spur,
558 };
559
560 let is_new_term = !self.inverted_index.contains_key(&term_key);
561 let posting = self
562 .inverted_index
563 .entry(term_key)
564 .or_insert_with(PostingListBuilder::new);
565 posting.add(doc_id, 1);
566
567 self.estimated_memory += size_of::<CompactPosting>();
568 if is_new_term {
569 self.estimated_memory += NEW_TERM_OVERHEAD;
570 }
571 if is_new_string {
572 self.estimated_memory += self.numeric_buffer.len() + INTERN_OVERHEAD;
573 }
574
575 Ok(())
576 }
577
578 fn index_dense_vector_field(
580 &mut self,
581 field: Field,
582 doc_id: DocId,
583 ordinal: u16,
584 vector: &[f32],
585 ) -> Result<()> {
586 let dim = vector.len();
587
588 let builder = self
589 .dense_vectors
590 .entry(field.0)
591 .or_insert_with(|| DenseVectorBuilder::new(dim));
592
593 if builder.dim != dim && builder.len() > 0 {
595 return Err(crate::Error::Schema(format!(
596 "Dense vector dimension mismatch: expected {}, got {}",
597 builder.dim, dim
598 )));
599 }
600
601 builder.add(doc_id, ordinal, vector);
602
603 self.estimated_memory += std::mem::size_of_val(vector) + size_of::<(DocId, u16)>();
604
605 Ok(())
606 }
607
608 fn index_sparse_vector_field(
615 &mut self,
616 field: Field,
617 doc_id: DocId,
618 ordinal: u16,
619 entries: &[(u32, f32)],
620 ) -> Result<()> {
621 let weight_threshold = self
623 .schema
624 .get_field_entry(field)
625 .and_then(|entry| entry.sparse_vector_config.as_ref())
626 .map(|config| config.weight_threshold)
627 .unwrap_or(0.0);
628
629 let builder = self
630 .sparse_vectors
631 .entry(field.0)
632 .or_insert_with(SparseVectorBuilder::new);
633
634 for &(dim_id, weight) in entries {
635 if weight.abs() < weight_threshold {
637 continue;
638 }
639
640 let is_new_dim = !builder.postings.contains_key(&dim_id);
641 builder.add(dim_id, doc_id, ordinal, weight);
642 self.estimated_memory += size_of::<(DocId, u16, f32)>();
643 if is_new_dim {
644 self.estimated_memory += size_of::<u32>() + size_of::<Vec<(DocId, u16, f32)>>() + 8; }
647 }
648
649 Ok(())
650 }
651
652 fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
654 use byteorder::{LittleEndian, WriteBytesExt};
655
656 super::store::serialize_document_into(doc, &self.schema, &mut self.doc_serialize_buffer)?;
657
658 self.store_file
659 .write_u32::<LittleEndian>(self.doc_serialize_buffer.len() as u32)?;
660 self.store_file.write_all(&self.doc_serialize_buffer)?;
661
662 Ok(())
663 }
664
665 pub async fn build<D: Directory + DirectoryWriter>(
671 mut self,
672 dir: &D,
673 segment_id: SegmentId,
674 trained: Option<&super::TrainedVectorStructures>,
675 ) -> Result<SegmentMeta> {
676 self.store_file.flush()?;
678
679 let files = SegmentFiles::new(segment_id.0);
680
681 let position_index = std::mem::take(&mut self.position_index);
683 let position_offsets = if !position_index.is_empty() {
684 let mut pos_writer = dir.streaming_writer(&files.positions).await?;
685 let offsets = postings_build::build_positions_streaming(
686 position_index,
687 &self.term_interner,
688 &mut *pos_writer,
689 )?;
690 pos_writer.finish()?;
691 offsets
692 } else {
693 FxHashMap::default()
694 };
695
696 let inverted_index = std::mem::take(&mut self.inverted_index);
699 let term_interner = std::mem::replace(&mut self.term_interner, Rodeo::new());
700 let store_path = self.store_path.clone();
701 let num_compression_threads = self.config.num_compression_threads;
702 let compression_level = self.config.compression_level;
703 let dense_vectors = std::mem::take(&mut self.dense_vectors);
704 let mut sparse_vectors = std::mem::take(&mut self.sparse_vectors);
705 let schema = &self.schema;
706
707 let mut term_dict_writer = dir.streaming_writer(&files.term_dict).await?;
709 let mut postings_writer = dir.streaming_writer(&files.postings).await?;
710 let mut store_writer = dir.streaming_writer(&files.store).await?;
711 let mut vectors_writer = if !dense_vectors.is_empty() {
712 Some(dir.streaming_writer(&files.vectors).await?)
713 } else {
714 None
715 };
716 let mut sparse_writer = if !sparse_vectors.is_empty() {
717 Some(dir.streaming_writer(&files.sparse).await?)
718 } else {
719 None
720 };
721
722 let ((postings_result, store_result), (vectors_result, sparse_result)) = rayon::join(
723 || {
724 rayon::join(
725 || {
726 postings_build::build_postings_streaming(
727 inverted_index,
728 term_interner,
729 &position_offsets,
730 &mut *term_dict_writer,
731 &mut *postings_writer,
732 )
733 },
734 || {
735 store_build::build_store_streaming(
736 &store_path,
737 num_compression_threads,
738 compression_level,
739 &mut *store_writer,
740 )
741 },
742 )
743 },
744 || {
745 rayon::join(
746 || -> Result<()> {
747 if let Some(ref mut w) = vectors_writer {
748 dense_build::build_vectors_streaming(
749 dense_vectors,
750 schema,
751 trained,
752 &mut **w,
753 )?;
754 }
755 Ok(())
756 },
757 || -> Result<()> {
758 if let Some(ref mut w) = sparse_writer {
759 sparse_build::build_sparse_streaming(
760 &mut sparse_vectors,
761 schema,
762 &mut **w,
763 )?;
764 }
765 Ok(())
766 },
767 )
768 },
769 );
770 postings_result?;
771 store_result?;
772 vectors_result?;
773 sparse_result?;
774 term_dict_writer.finish()?;
775 postings_writer.finish()?;
776 store_writer.finish()?;
777 if let Some(w) = vectors_writer {
778 w.finish()?;
779 }
780 if let Some(w) = sparse_writer {
781 w.finish()?;
782 }
783 drop(position_offsets);
784 drop(sparse_vectors);
785
786 let meta = SegmentMeta {
787 id: segment_id.0,
788 num_docs: self.next_doc_id,
789 field_stats: self.field_stats.clone(),
790 };
791
792 dir.write(&files.meta, &meta.serialize()?).await?;
793
794 let _ = std::fs::remove_file(&self.store_path);
796
797 Ok(meta)
798 }
799}
800
801impl Drop for SegmentBuilder {
802 fn drop(&mut self) {
803 let _ = std::fs::remove_file(&self.store_path);
805 }
806}