1mod config;
12mod dense;
13#[cfg(feature = "diagnostics")]
14mod diagnostics;
15mod postings;
16mod sparse;
17mod store;
18
19pub use config::{MemoryBreakdown, SegmentBuilderConfig, SegmentBuilderStats};
20
21use std::fs::{File, OpenOptions};
22use std::io::{BufWriter, Write};
23use std::mem::size_of;
24use std::path::PathBuf;
25
26use hashbrown::HashMap;
27use lasso::{Rodeo, Spur};
28use rustc_hash::FxHashMap;
29
30use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
31use std::sync::Arc;
32
33use crate::directories::{Directory, DirectoryWriter};
34use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
35use crate::tokenizer::BoxedTokenizer;
36use crate::{DocId, Result};
37
38use dense::DenseVectorBuilder;
39use postings::{CompactPosting, PositionPostingListBuilder, PostingListBuilder, TermKey};
40use sparse::SparseVectorBuilder;
41
42const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; const NEW_TERM_OVERHEAD: usize = size_of::<TermKey>() + size_of::<PostingListBuilder>() + 24;
48
49const INTERN_OVERHEAD: usize = size_of::<Spur>() + 2 * size_of::<usize>();
51
52const NEW_POS_TERM_OVERHEAD: usize =
54 size_of::<TermKey>() + size_of::<PositionPostingListBuilder>() + 24;
55
56pub struct SegmentBuilder {
63 schema: Arc<Schema>,
64 config: SegmentBuilderConfig,
65 tokenizers: FxHashMap<Field, BoxedTokenizer>,
66
67 term_interner: Rodeo,
69
70 inverted_index: HashMap<TermKey, PostingListBuilder>,
72
73 store_file: BufWriter<File>,
75 store_path: PathBuf,
76
77 next_doc_id: DocId,
79
80 field_stats: FxHashMap<u32, FieldStats>,
82
83 doc_field_lengths: Vec<u32>,
87 num_indexed_fields: usize,
88 field_to_slot: FxHashMap<u32, usize>,
89
90 local_tf_buffer: FxHashMap<Spur, u32>,
93
94 local_positions: FxHashMap<Spur, Vec<u32>>,
97
98 token_buffer: String,
100
101 numeric_buffer: String,
103
104 dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
107
108 sparse_vectors: FxHashMap<u32, SparseVectorBuilder>,
111
112 position_index: HashMap<TermKey, PositionPostingListBuilder>,
115
116 position_enabled_fields: FxHashMap<u32, Option<crate::dsl::PositionMode>>,
118
119 current_element_ordinal: FxHashMap<u32, u32>,
121
122 estimated_memory: usize,
124
125 doc_serialize_buffer: Vec<u8>,
127}
128
129impl SegmentBuilder {
130 pub fn new(schema: Arc<Schema>, config: SegmentBuilderConfig) -> Result<Self> {
132 let segment_id = uuid::Uuid::new_v4();
133 let store_path = config
134 .temp_dir
135 .join(format!("hermes_store_{}.tmp", segment_id));
136
137 let store_file = BufWriter::with_capacity(
138 STORE_BUFFER_SIZE,
139 OpenOptions::new()
140 .create(true)
141 .write(true)
142 .truncate(true)
143 .open(&store_path)?,
144 );
145
146 let mut num_indexed_fields = 0;
149 let mut field_to_slot = FxHashMap::default();
150 let mut position_enabled_fields = FxHashMap::default();
151 for (field, entry) in schema.fields() {
152 if entry.indexed && matches!(entry.field_type, FieldType::Text) {
153 field_to_slot.insert(field.0, num_indexed_fields);
154 num_indexed_fields += 1;
155 if entry.positions.is_some() {
156 position_enabled_fields.insert(field.0, entry.positions);
157 }
158 }
159 }
160
161 Ok(Self {
162 schema,
163 tokenizers: FxHashMap::default(),
164 term_interner: Rodeo::new(),
165 inverted_index: HashMap::with_capacity(config.posting_map_capacity),
166 store_file,
167 store_path,
168 next_doc_id: 0,
169 field_stats: FxHashMap::default(),
170 doc_field_lengths: Vec::new(),
171 num_indexed_fields,
172 field_to_slot,
173 local_tf_buffer: FxHashMap::default(),
174 local_positions: FxHashMap::default(),
175 token_buffer: String::with_capacity(64),
176 numeric_buffer: String::with_capacity(32),
177 config,
178 dense_vectors: FxHashMap::default(),
179 sparse_vectors: FxHashMap::default(),
180 position_index: HashMap::new(),
181 position_enabled_fields,
182 current_element_ordinal: FxHashMap::default(),
183 estimated_memory: 0,
184 doc_serialize_buffer: Vec::with_capacity(256),
185 })
186 }
187
188 pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
189 self.tokenizers.insert(field, tokenizer);
190 }
191
192 fn next_element_ordinal(&mut self, field_id: u32) -> u32 {
195 let ordinal = *self.current_element_ordinal.get(&field_id).unwrap_or(&0);
196 *self.current_element_ordinal.entry(field_id).or_insert(0) += 1;
197 ordinal
198 }
199
200 pub fn num_docs(&self) -> u32 {
201 self.next_doc_id
202 }
203
204 #[inline]
206 pub fn estimated_memory_bytes(&self) -> usize {
207 self.estimated_memory
208 }
209
210 pub fn sparse_dim_count(&self) -> usize {
212 self.sparse_vectors.values().map(|b| b.postings.len()).sum()
213 }
214
215 pub fn stats(&self) -> SegmentBuilderStats {
217 use std::mem::size_of;
218
219 let postings_in_memory: usize =
220 self.inverted_index.values().map(|p| p.postings.len()).sum();
221
222 let compact_posting_size = size_of::<CompactPosting>();
224 let vec_overhead = size_of::<Vec<u8>>(); let term_key_size = size_of::<TermKey>();
226 let posting_builder_size = size_of::<PostingListBuilder>();
227 let spur_size = size_of::<lasso::Spur>();
228 let sparse_entry_size = size_of::<(DocId, u16, f32)>();
229
230 let hashmap_entry_base_overhead = 8usize;
233
234 let fxhashmap_entry_overhead = hashmap_entry_base_overhead;
236
237 let postings_bytes: usize = self
239 .inverted_index
240 .values()
241 .map(|p| p.postings.capacity() * compact_posting_size + vec_overhead)
242 .sum();
243
244 let index_overhead_bytes = self.inverted_index.len()
246 * (term_key_size + posting_builder_size + hashmap_entry_base_overhead);
247
248 let interner_arena_overhead = 2 * size_of::<usize>();
251 let avg_term_len = 8; let interner_bytes =
253 self.term_interner.len() * (avg_term_len + spur_size + interner_arena_overhead);
254
255 let field_lengths_bytes =
257 self.doc_field_lengths.capacity() * size_of::<u32>() + vec_overhead;
258
259 let mut dense_vectors_bytes: usize = 0;
261 let mut dense_vector_count: usize = 0;
262 let doc_id_ordinal_size = size_of::<(DocId, u16)>();
263 for b in self.dense_vectors.values() {
264 dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
265 + b.doc_ids.capacity() * doc_id_ordinal_size
266 + 2 * vec_overhead; dense_vector_count += b.doc_ids.len();
268 }
269
270 let local_tf_entry_size = spur_size + size_of::<u32>() + fxhashmap_entry_overhead;
272 let local_tf_buffer_bytes = self.local_tf_buffer.capacity() * local_tf_entry_size;
273
274 let mut sparse_vectors_bytes: usize = 0;
276 for builder in self.sparse_vectors.values() {
277 for postings in builder.postings.values() {
278 sparse_vectors_bytes += postings.capacity() * sparse_entry_size + vec_overhead;
279 }
280 let inner_entry_size = size_of::<u32>() + vec_overhead + fxhashmap_entry_overhead;
282 sparse_vectors_bytes += builder.postings.len() * inner_entry_size;
283 }
284 let outer_sparse_entry_size =
286 size_of::<u32>() + size_of::<SparseVectorBuilder>() + fxhashmap_entry_overhead;
287 sparse_vectors_bytes += self.sparse_vectors.len() * outer_sparse_entry_size;
288
289 let mut position_index_bytes: usize = 0;
291 for pos_builder in self.position_index.values() {
292 for (_, positions) in &pos_builder.postings {
293 position_index_bytes += positions.capacity() * size_of::<u32>() + vec_overhead;
294 }
295 let pos_entry_size = size_of::<DocId>() + vec_overhead;
297 position_index_bytes += pos_builder.postings.capacity() * pos_entry_size;
298 }
299 let pos_index_entry_size =
301 term_key_size + size_of::<PositionPostingListBuilder>() + hashmap_entry_base_overhead;
302 position_index_bytes += self.position_index.len() * pos_index_entry_size;
303
304 let estimated_memory_bytes = postings_bytes
305 + index_overhead_bytes
306 + interner_bytes
307 + field_lengths_bytes
308 + dense_vectors_bytes
309 + local_tf_buffer_bytes
310 + sparse_vectors_bytes
311 + position_index_bytes;
312
313 let memory_breakdown = MemoryBreakdown {
314 postings_bytes,
315 index_overhead_bytes,
316 interner_bytes,
317 field_lengths_bytes,
318 dense_vectors_bytes,
319 dense_vector_count,
320 sparse_vectors_bytes,
321 position_index_bytes,
322 };
323
324 SegmentBuilderStats {
325 num_docs: self.next_doc_id,
326 unique_terms: self.inverted_index.len(),
327 postings_in_memory,
328 interned_strings: self.term_interner.len(),
329 doc_field_lengths_size: self.doc_field_lengths.len(),
330 estimated_memory_bytes,
331 memory_breakdown,
332 }
333 }
334
335 pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
337 let doc_id = self.next_doc_id;
338 self.next_doc_id += 1;
339
340 let base_idx = self.doc_field_lengths.len();
342 self.doc_field_lengths
343 .resize(base_idx + self.num_indexed_fields, 0);
344 self.estimated_memory += self.num_indexed_fields * std::mem::size_of::<u32>();
345
346 self.current_element_ordinal.clear();
348
349 for (field, value) in doc.field_values() {
350 let Some(entry) = self.schema.get_field_entry(*field) else {
351 continue;
352 };
353
354 if !matches!(&entry.field_type, FieldType::DenseVector) && !entry.indexed {
357 continue;
358 }
359
360 match (&entry.field_type, value) {
361 (FieldType::Text, FieldValue::Text(text)) => {
362 let element_ordinal = self.next_element_ordinal(field.0);
363 let token_count =
364 self.index_text_field(*field, doc_id, text, element_ordinal)?;
365
366 let stats = self.field_stats.entry(field.0).or_default();
367 stats.total_tokens += token_count as u64;
368 if element_ordinal == 0 {
369 stats.doc_count += 1;
370 }
371
372 if let Some(&slot) = self.field_to_slot.get(&field.0) {
373 self.doc_field_lengths[base_idx + slot] = token_count;
374 }
375 }
376 (FieldType::U64, FieldValue::U64(v)) => {
377 self.index_numeric_field(*field, doc_id, *v)?;
378 }
379 (FieldType::I64, FieldValue::I64(v)) => {
380 self.index_numeric_field(*field, doc_id, *v as u64)?;
381 }
382 (FieldType::F64, FieldValue::F64(v)) => {
383 self.index_numeric_field(*field, doc_id, v.to_bits())?;
384 }
385 (FieldType::DenseVector, FieldValue::DenseVector(vec))
386 if entry.indexed || entry.stored =>
387 {
388 let ordinal = self.next_element_ordinal(field.0);
389 self.index_dense_vector_field(*field, doc_id, ordinal as u16, vec)?;
390 }
391 (FieldType::SparseVector, FieldValue::SparseVector(entries)) => {
392 let ordinal = self.next_element_ordinal(field.0);
393 self.index_sparse_vector_field(*field, doc_id, ordinal as u16, entries)?;
394 }
395 _ => {}
396 }
397 }
398
399 self.write_document_to_store(&doc)?;
401
402 Ok(doc_id)
403 }
404
405 fn index_text_field(
414 &mut self,
415 field: Field,
416 doc_id: DocId,
417 text: &str,
418 element_ordinal: u32,
419 ) -> Result<u32> {
420 use crate::dsl::PositionMode;
421
422 let field_id = field.0;
423 let position_mode = self
424 .position_enabled_fields
425 .get(&field_id)
426 .copied()
427 .flatten();
428
429 self.local_tf_buffer.clear();
433 for v in self.local_positions.values_mut() {
435 v.clear();
436 }
437
438 let mut token_position = 0u32;
439
440 let custom_tokens = self.tokenizers.get(&field).map(|t| t.tokenize(text));
444
445 if let Some(tokens) = custom_tokens {
446 for token in &tokens {
448 let is_new_string = !self.term_interner.contains(&token.text);
449 let term_spur = self.term_interner.get_or_intern(&token.text);
450 if is_new_string {
451 self.estimated_memory += token.text.len() + INTERN_OVERHEAD;
452 }
453 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
454
455 if let Some(mode) = position_mode {
456 let encoded_pos = match mode {
457 PositionMode::Ordinal => element_ordinal << 20,
458 PositionMode::TokenPosition => token.position,
459 PositionMode::Full => (element_ordinal << 20) | token.position,
460 };
461 self.local_positions
462 .entry(term_spur)
463 .or_default()
464 .push(encoded_pos);
465 }
466 }
467 token_position = tokens.len() as u32;
468 } else {
469 for word in text.split_whitespace() {
471 self.token_buffer.clear();
472 for c in word.chars() {
473 if c.is_alphanumeric() {
474 for lc in c.to_lowercase() {
475 self.token_buffer.push(lc);
476 }
477 }
478 }
479
480 if self.token_buffer.is_empty() {
481 continue;
482 }
483
484 let is_new_string = !self.term_interner.contains(&self.token_buffer);
485 let term_spur = self.term_interner.get_or_intern(&self.token_buffer);
486 if is_new_string {
487 self.estimated_memory += self.token_buffer.len() + INTERN_OVERHEAD;
488 }
489 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
490
491 if let Some(mode) = position_mode {
492 let encoded_pos = match mode {
493 PositionMode::Ordinal => element_ordinal << 20,
494 PositionMode::TokenPosition => token_position,
495 PositionMode::Full => (element_ordinal << 20) | token_position,
496 };
497 self.local_positions
498 .entry(term_spur)
499 .or_default()
500 .push(encoded_pos);
501 }
502
503 token_position += 1;
504 }
505 }
506
507 for (&term_spur, &tf) in &self.local_tf_buffer {
510 let term_key = TermKey {
511 field: field_id,
512 term: term_spur,
513 };
514
515 let is_new_term = !self.inverted_index.contains_key(&term_key);
516 let posting = self
517 .inverted_index
518 .entry(term_key)
519 .or_insert_with(PostingListBuilder::new);
520 posting.add(doc_id, tf);
521
522 self.estimated_memory += size_of::<CompactPosting>();
523 if is_new_term {
524 self.estimated_memory += NEW_TERM_OVERHEAD;
525 }
526
527 if position_mode.is_some()
528 && let Some(positions) = self.local_positions.get(&term_spur)
529 {
530 let is_new_pos_term = !self.position_index.contains_key(&term_key);
531 let pos_posting = self
532 .position_index
533 .entry(term_key)
534 .or_insert_with(PositionPostingListBuilder::new);
535 for &pos in positions {
536 pos_posting.add_position(doc_id, pos);
537 }
538 self.estimated_memory += positions.len() * size_of::<u32>();
539 if is_new_pos_term {
540 self.estimated_memory += NEW_POS_TERM_OVERHEAD;
541 }
542 }
543 }
544
545 Ok(token_position)
546 }
547
548 fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
549 use std::fmt::Write;
550
551 self.numeric_buffer.clear();
552 write!(self.numeric_buffer, "__num_{}", value).unwrap();
553 let is_new_string = !self.term_interner.contains(&self.numeric_buffer);
554 let term_spur = self.term_interner.get_or_intern(&self.numeric_buffer);
555
556 let term_key = TermKey {
557 field: field.0,
558 term: term_spur,
559 };
560
561 let is_new_term = !self.inverted_index.contains_key(&term_key);
562 let posting = self
563 .inverted_index
564 .entry(term_key)
565 .or_insert_with(PostingListBuilder::new);
566 posting.add(doc_id, 1);
567
568 self.estimated_memory += size_of::<CompactPosting>();
569 if is_new_term {
570 self.estimated_memory += NEW_TERM_OVERHEAD;
571 }
572 if is_new_string {
573 self.estimated_memory += self.numeric_buffer.len() + INTERN_OVERHEAD;
574 }
575
576 Ok(())
577 }
578
579 fn index_dense_vector_field(
581 &mut self,
582 field: Field,
583 doc_id: DocId,
584 ordinal: u16,
585 vector: &[f32],
586 ) -> Result<()> {
587 let dim = vector.len();
588
589 let builder = self
590 .dense_vectors
591 .entry(field.0)
592 .or_insert_with(|| DenseVectorBuilder::new(dim));
593
594 if builder.dim != dim && builder.len() > 0 {
596 return Err(crate::Error::Schema(format!(
597 "Dense vector dimension mismatch: expected {}, got {}",
598 builder.dim, dim
599 )));
600 }
601
602 builder.add(doc_id, ordinal, vector);
603
604 self.estimated_memory += std::mem::size_of_val(vector) + size_of::<(DocId, u16)>();
605
606 Ok(())
607 }
608
609 fn index_sparse_vector_field(
616 &mut self,
617 field: Field,
618 doc_id: DocId,
619 ordinal: u16,
620 entries: &[(u32, f32)],
621 ) -> Result<()> {
622 let weight_threshold = self
624 .schema
625 .get_field_entry(field)
626 .and_then(|entry| entry.sparse_vector_config.as_ref())
627 .map(|config| config.weight_threshold)
628 .unwrap_or(0.0);
629
630 let builder = self
631 .sparse_vectors
632 .entry(field.0)
633 .or_insert_with(SparseVectorBuilder::new);
634
635 for &(dim_id, weight) in entries {
636 if weight.abs() < weight_threshold {
638 continue;
639 }
640
641 let is_new_dim = !builder.postings.contains_key(&dim_id);
642 builder.add(dim_id, doc_id, ordinal, weight);
643 self.estimated_memory += size_of::<(DocId, u16, f32)>();
644 if is_new_dim {
645 self.estimated_memory += size_of::<u32>() + size_of::<Vec<(DocId, u16, f32)>>() + 8; }
648 }
649
650 Ok(())
651 }
652
653 fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
655 use byteorder::{LittleEndian, WriteBytesExt};
656
657 super::store::serialize_document_into(doc, &self.schema, &mut self.doc_serialize_buffer)?;
658
659 self.store_file
660 .write_u32::<LittleEndian>(self.doc_serialize_buffer.len() as u32)?;
661 self.store_file.write_all(&self.doc_serialize_buffer)?;
662
663 Ok(())
664 }
665
666 pub async fn build<D: Directory + DirectoryWriter>(
672 mut self,
673 dir: &D,
674 segment_id: SegmentId,
675 trained: Option<&super::TrainedVectorStructures>,
676 ) -> Result<SegmentMeta> {
677 self.store_file.flush()?;
679
680 let files = SegmentFiles::new(segment_id.0);
681
682 let position_index = std::mem::take(&mut self.position_index);
684 let position_offsets = if !position_index.is_empty() {
685 let mut pos_writer = dir.streaming_writer(&files.positions).await?;
686 let offsets = postings::build_positions_streaming(
687 position_index,
688 &self.term_interner,
689 &mut *pos_writer,
690 )?;
691 pos_writer.finish()?;
692 offsets
693 } else {
694 FxHashMap::default()
695 };
696
697 let inverted_index = std::mem::take(&mut self.inverted_index);
700 let term_interner = std::mem::replace(&mut self.term_interner, Rodeo::new());
701 let store_path = self.store_path.clone();
702 let num_compression_threads = self.config.num_compression_threads;
703 let compression_level = self.config.compression_level;
704 let dense_vectors = std::mem::take(&mut self.dense_vectors);
705 let mut sparse_vectors = std::mem::take(&mut self.sparse_vectors);
706 let schema = &self.schema;
707
708 let mut term_dict_writer = dir.streaming_writer(&files.term_dict).await?;
710 let mut postings_writer = dir.streaming_writer(&files.postings).await?;
711 let mut store_writer = dir.streaming_writer(&files.store).await?;
712 let mut vectors_writer = if !dense_vectors.is_empty() {
713 Some(dir.streaming_writer(&files.vectors).await?)
714 } else {
715 None
716 };
717 let mut sparse_writer = if !sparse_vectors.is_empty() {
718 Some(dir.streaming_writer(&files.sparse).await?)
719 } else {
720 None
721 };
722
723 let ((postings_result, store_result), (vectors_result, sparse_result)) = rayon::join(
724 || {
725 rayon::join(
726 || {
727 postings::build_postings_streaming(
728 inverted_index,
729 term_interner,
730 &position_offsets,
731 &mut *term_dict_writer,
732 &mut *postings_writer,
733 )
734 },
735 || {
736 store::build_store_streaming(
737 &store_path,
738 num_compression_threads,
739 compression_level,
740 &mut *store_writer,
741 )
742 },
743 )
744 },
745 || {
746 rayon::join(
747 || -> Result<()> {
748 if let Some(ref mut w) = vectors_writer {
749 dense::build_vectors_streaming(
750 dense_vectors,
751 schema,
752 trained,
753 &mut **w,
754 )?;
755 }
756 Ok(())
757 },
758 || -> Result<()> {
759 if let Some(ref mut w) = sparse_writer {
760 sparse::build_sparse_streaming(&mut sparse_vectors, schema, &mut **w)?;
761 }
762 Ok(())
763 },
764 )
765 },
766 );
767 postings_result?;
768 store_result?;
769 vectors_result?;
770 sparse_result?;
771 term_dict_writer.finish()?;
772 postings_writer.finish()?;
773 store_writer.finish()?;
774 if let Some(w) = vectors_writer {
775 w.finish()?;
776 }
777 if let Some(w) = sparse_writer {
778 w.finish()?;
779 }
780 drop(position_offsets);
781 drop(sparse_vectors);
782
783 let meta = SegmentMeta {
784 id: segment_id.0,
785 num_docs: self.next_doc_id,
786 field_stats: self.field_stats.clone(),
787 };
788
789 dir.write(&files.meta, &meta.serialize()?).await?;
790
791 let _ = std::fs::remove_file(&self.store_path);
793
794 Ok(meta)
795 }
796}
797
798impl Drop for SegmentBuilder {
799 fn drop(&mut self) {
800 let _ = std::fs::remove_file(&self.store_path);
802 }
803}