1pub(crate) mod bmp;
12mod config;
13mod dense;
14#[cfg(feature = "diagnostics")]
15mod diagnostics;
16pub(crate) mod graph_bisection;
17mod postings;
18mod sparse;
19mod store;
20
21pub use config::{MemoryBreakdown, SegmentBuilderConfig, SegmentBuilderStats};
22
23#[cfg(feature = "native")]
24use std::fs::{File, OpenOptions};
25#[cfg(feature = "native")]
26use std::io::BufWriter;
27use std::io::Write;
28use std::mem::size_of;
29#[cfg(feature = "native")]
30use std::path::PathBuf;
31
32use hashbrown::HashMap;
33use rustc_hash::FxHashMap;
34
35#[cfg(feature = "native")]
37use lasso::{Rodeo, Spur};
38
39#[cfg(not(feature = "native"))]
40pub(crate) mod simple_interner {
41 use hashbrown::HashMap;
42
43 #[derive(Clone, Copy, PartialEq, Eq, Hash)]
44 pub struct Spur(u32);
45
46 pub struct Rodeo {
49 strings: Vec<Box<str>>,
51 map: HashMap<&'static str, u32>,
54 }
55
56 impl Rodeo {
57 pub fn new() -> Self {
58 Self {
59 strings: Vec::new(),
60 map: HashMap::new(),
61 }
62 }
63
64 pub fn get(&self, key: &str) -> Option<Spur> {
65 self.map.get(key).map(|&id| Spur(id))
66 }
67
68 pub fn get_or_intern(&mut self, key: &str) -> Spur {
69 if let Some(&id) = self.map.get(key) {
70 return Spur(id);
71 }
72 let id = self.strings.len() as u32;
73 let boxed: Box<str> = key.into();
74 let static_ref: &'static str = unsafe { &*(boxed.as_ref() as *const str) };
77 self.strings.push(boxed);
78 self.map.insert(static_ref, id);
79 Spur(id)
80 }
81
82 pub fn resolve(&self, spur: &Spur) -> &str {
83 &self.strings[spur.0 as usize]
84 }
85
86 pub fn len(&self) -> usize {
87 self.strings.len()
88 }
89 }
90}
91
92#[cfg(not(feature = "native"))]
93use simple_interner::{Rodeo, Spur};
94
95use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
96use std::sync::Arc;
97
98use crate::directories::{Directory, DirectoryWriter};
99use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
100use crate::tokenizer::BoxedTokenizer;
101use crate::{DocId, Result};
102
103use dense::{BinaryDenseVectorBuilder, DenseVectorBuilder};
104use postings::{CompactPosting, PositionPostingListBuilder, PostingListBuilder, TermKey};
105use sparse::SparseVectorBuilder;
106
107const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; const NEW_TERM_OVERHEAD: usize = size_of::<TermKey>() + size_of::<PostingListBuilder>() + 24;
113
114const INTERN_OVERHEAD: usize = size_of::<Spur>() + 2 * size_of::<usize>();
116
117const NEW_POS_TERM_OVERHEAD: usize =
119 size_of::<TermKey>() + size_of::<PositionPostingListBuilder>() + 24;
120
121pub struct SegmentBuilder {
128 schema: Arc<Schema>,
129 config: SegmentBuilderConfig,
130 tokenizers: FxHashMap<Field, BoxedTokenizer>,
131
132 term_interner: Rodeo,
134
135 inverted_index: HashMap<TermKey, PostingListBuilder>,
137
138 #[cfg(feature = "native")]
140 store_file: BufWriter<File>,
141 #[cfg(feature = "native")]
142 store_path: PathBuf,
143 #[cfg(not(feature = "native"))]
144 store_buffer: Vec<u8>,
145
146 next_doc_id: DocId,
148
149 field_stats: FxHashMap<u32, FieldStats>,
151
152 doc_field_lengths: Vec<u32>,
156 num_indexed_fields: usize,
157 field_to_slot: FxHashMap<u32, usize>,
158
159 local_tf_buffer: FxHashMap<Spur, u32>,
162
163 local_positions: FxHashMap<Spur, Vec<u32>>,
166
167 token_buffer: String,
169
170 numeric_buffer: String,
172
173 dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
176
177 binary_dense_vectors: FxHashMap<u32, BinaryDenseVectorBuilder>,
179
180 sparse_vectors: FxHashMap<u32, SparseVectorBuilder>,
183
184 position_index: HashMap<TermKey, PositionPostingListBuilder>,
187
188 position_enabled_fields: FxHashMap<u32, Option<crate::dsl::PositionMode>>,
190
191 current_element_ordinal: FxHashMap<u32, u32>,
193
194 estimated_memory: usize,
196
197 doc_serialize_buffer: Vec<u8>,
199
200 fast_fields: FxHashMap<u32, crate::structures::fast_field::FastFieldWriter>,
202}
203
204impl SegmentBuilder {
205 pub fn new(schema: Arc<Schema>, config: SegmentBuilderConfig) -> Result<Self> {
207 #[cfg(feature = "native")]
208 let (store_file, store_path) = {
209 let segment_id = uuid::Uuid::new_v4();
210 let store_path = config
211 .temp_dir
212 .join(format!("hermes_store_{}.tmp", segment_id));
213 let store_file = BufWriter::with_capacity(
214 STORE_BUFFER_SIZE,
215 OpenOptions::new()
216 .create(true)
217 .write(true)
218 .truncate(true)
219 .open(&store_path)?,
220 );
221 (store_file, store_path)
222 };
223
224 let registry = crate::tokenizer::TokenizerRegistry::new();
226 let mut num_indexed_fields = 0;
227 let mut field_to_slot = FxHashMap::default();
228 let mut position_enabled_fields = FxHashMap::default();
229 let mut tokenizers = FxHashMap::default();
230 for (field, entry) in schema.fields() {
231 if entry.indexed && matches!(entry.field_type, FieldType::Text) {
232 field_to_slot.insert(field.0, num_indexed_fields);
233 num_indexed_fields += 1;
234 if entry.positions.is_some() {
235 position_enabled_fields.insert(field.0, entry.positions);
236 }
237 if let Some(ref tok_name) = entry.tokenizer
238 && let Some(tokenizer) = registry.get(tok_name)
239 {
240 tokenizers.insert(field, tokenizer);
241 }
242 }
243 }
244
245 use crate::structures::fast_field::{FastFieldColumnType, FastFieldWriter};
247 let mut fast_fields = FxHashMap::default();
248 for (field, entry) in schema.fields() {
249 if entry.fast {
250 let writer = if entry.multi {
251 match entry.field_type {
252 FieldType::U64 => {
253 FastFieldWriter::new_numeric_multi(FastFieldColumnType::U64)
254 }
255 FieldType::I64 => {
256 FastFieldWriter::new_numeric_multi(FastFieldColumnType::I64)
257 }
258 FieldType::F64 => {
259 FastFieldWriter::new_numeric_multi(FastFieldColumnType::F64)
260 }
261 FieldType::Text => FastFieldWriter::new_text_multi(),
262 _ => continue,
263 }
264 } else {
265 match entry.field_type {
266 FieldType::U64 => FastFieldWriter::new_numeric(FastFieldColumnType::U64),
267 FieldType::I64 => FastFieldWriter::new_numeric(FastFieldColumnType::I64),
268 FieldType::F64 => FastFieldWriter::new_numeric(FastFieldColumnType::F64),
269 FieldType::Text => FastFieldWriter::new_text(),
270 _ => continue,
271 }
272 };
273 fast_fields.insert(field.0, writer);
274 }
275 }
276
277 Ok(Self {
278 schema,
279 tokenizers,
280 term_interner: Rodeo::new(),
281 inverted_index: HashMap::with_capacity(config.posting_map_capacity),
282 #[cfg(feature = "native")]
283 store_file,
284 #[cfg(feature = "native")]
285 store_path,
286 #[cfg(not(feature = "native"))]
287 store_buffer: Vec::with_capacity(STORE_BUFFER_SIZE),
288 next_doc_id: 0,
289 field_stats: FxHashMap::default(),
290 doc_field_lengths: Vec::new(),
291 num_indexed_fields,
292 field_to_slot,
293 local_tf_buffer: FxHashMap::default(),
294 local_positions: FxHashMap::default(),
295 token_buffer: String::with_capacity(64),
296 numeric_buffer: String::with_capacity(32),
297 config,
298 dense_vectors: FxHashMap::default(),
299 binary_dense_vectors: FxHashMap::default(),
300 sparse_vectors: FxHashMap::default(),
301 position_index: HashMap::new(),
302 position_enabled_fields,
303 current_element_ordinal: FxHashMap::default(),
304 estimated_memory: 0,
305 doc_serialize_buffer: Vec::with_capacity(256),
306 fast_fields,
307 })
308 }
309
310 pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
311 self.tokenizers.insert(field, tokenizer);
312 }
313
314 fn next_element_ordinal(&mut self, field_id: u32) -> u32 {
317 let ordinal = *self.current_element_ordinal.get(&field_id).unwrap_or(&0);
318 *self.current_element_ordinal.entry(field_id).or_insert(0) += 1;
319 ordinal
320 }
321
322 pub fn num_docs(&self) -> u32 {
323 self.next_doc_id
324 }
325
326 #[inline]
328 pub fn estimated_memory_bytes(&self) -> usize {
329 self.estimated_memory
330 }
331
332 pub fn sparse_dim_count(&self) -> usize {
334 self.sparse_vectors.values().map(|b| b.postings.len()).sum()
335 }
336
337 pub fn stats(&self) -> SegmentBuilderStats {
339 use std::mem::size_of;
340
341 let postings_in_memory: usize =
342 self.inverted_index.values().map(|p| p.postings.len()).sum();
343
344 let compact_posting_size = size_of::<CompactPosting>();
346 let vec_overhead = size_of::<Vec<u8>>(); let term_key_size = size_of::<TermKey>();
348 let posting_builder_size = size_of::<PostingListBuilder>();
349 let spur_size = size_of::<Spur>();
350 let sparse_entry_size = size_of::<(DocId, u16, f32)>();
351
352 let hashmap_entry_base_overhead = 8usize;
355
356 let fxhashmap_entry_overhead = hashmap_entry_base_overhead;
358
359 let postings_bytes: usize = self
361 .inverted_index
362 .values()
363 .map(|p| p.postings.capacity() * compact_posting_size + vec_overhead)
364 .sum();
365
366 let index_overhead_bytes = self.inverted_index.len()
368 * (term_key_size + posting_builder_size + hashmap_entry_base_overhead);
369
370 let interner_arena_overhead = 2 * size_of::<usize>();
373 let avg_term_len = 8; let interner_bytes =
375 self.term_interner.len() * (avg_term_len + spur_size + interner_arena_overhead);
376
377 let field_lengths_bytes =
379 self.doc_field_lengths.capacity() * size_of::<u32>() + vec_overhead;
380
381 let mut dense_vectors_bytes: usize = 0;
383 let mut dense_vector_count: usize = 0;
384 let doc_id_ordinal_size = size_of::<(DocId, u16)>();
385 for b in self.dense_vectors.values() {
386 dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
387 + b.doc_ids.capacity() * doc_id_ordinal_size
388 + 2 * vec_overhead; dense_vector_count += b.doc_ids.len();
390 }
391 for b in self.binary_dense_vectors.values() {
393 dense_vectors_bytes += b.vectors.capacity()
394 + b.doc_ids.capacity() * doc_id_ordinal_size
395 + 2 * vec_overhead;
396 dense_vector_count += b.doc_ids.len();
397 }
398
399 let local_tf_entry_size = spur_size + size_of::<u32>() + fxhashmap_entry_overhead;
401 let local_tf_buffer_bytes = self.local_tf_buffer.capacity() * local_tf_entry_size;
402
403 let mut sparse_vectors_bytes: usize = 0;
405 for builder in self.sparse_vectors.values() {
406 for postings in builder.postings.values() {
407 sparse_vectors_bytes += postings.capacity() * sparse_entry_size + vec_overhead;
408 }
409 let inner_entry_size = size_of::<u32>() + vec_overhead + fxhashmap_entry_overhead;
411 sparse_vectors_bytes += builder.postings.len() * inner_entry_size;
412 }
413 let outer_sparse_entry_size =
415 size_of::<u32>() + size_of::<SparseVectorBuilder>() + fxhashmap_entry_overhead;
416 sparse_vectors_bytes += self.sparse_vectors.len() * outer_sparse_entry_size;
417
418 let mut position_index_bytes: usize = 0;
420 for pos_builder in self.position_index.values() {
421 for (_, positions) in &pos_builder.postings {
422 position_index_bytes += positions.capacity() * size_of::<u32>() + vec_overhead;
423 }
424 let pos_entry_size = size_of::<DocId>() + vec_overhead;
426 position_index_bytes += pos_builder.postings.capacity() * pos_entry_size;
427 }
428 let pos_index_entry_size =
430 term_key_size + size_of::<PositionPostingListBuilder>() + hashmap_entry_base_overhead;
431 position_index_bytes += self.position_index.len() * pos_index_entry_size;
432
433 let estimated_memory_bytes = postings_bytes
434 + index_overhead_bytes
435 + interner_bytes
436 + field_lengths_bytes
437 + dense_vectors_bytes
438 + local_tf_buffer_bytes
439 + sparse_vectors_bytes
440 + position_index_bytes;
441
442 let memory_breakdown = MemoryBreakdown {
443 postings_bytes,
444 index_overhead_bytes,
445 interner_bytes,
446 field_lengths_bytes,
447 dense_vectors_bytes,
448 dense_vector_count,
449 sparse_vectors_bytes,
450 position_index_bytes,
451 };
452
453 SegmentBuilderStats {
454 num_docs: self.next_doc_id,
455 unique_terms: self.inverted_index.len(),
456 postings_in_memory,
457 interned_strings: self.term_interner.len(),
458 doc_field_lengths_size: self.doc_field_lengths.len(),
459 estimated_memory_bytes,
460 memory_breakdown,
461 }
462 }
463
464 pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
466 let doc_id = self.next_doc_id;
467 self.next_doc_id += 1;
468
469 let base_idx = self.doc_field_lengths.len();
471 self.doc_field_lengths
472 .resize(base_idx + self.num_indexed_fields, 0);
473 self.estimated_memory += self.num_indexed_fields * std::mem::size_of::<u32>();
474
475 self.current_element_ordinal.clear();
477
478 for (field, value) in doc.field_values() {
479 let Some(entry) = self.schema.get_field_entry(*field) else {
480 continue;
481 };
482
483 if !matches!(
486 &entry.field_type,
487 FieldType::DenseVector | FieldType::BinaryDenseVector
488 ) && !entry.indexed
489 && !entry.fast
490 {
491 continue;
492 }
493
494 match (&entry.field_type, value) {
495 (FieldType::Text, FieldValue::Text(text)) => {
496 if entry.indexed {
497 let element_ordinal = self.next_element_ordinal(field.0);
498 let token_count =
499 self.index_text_field(*field, doc_id, text, element_ordinal)?;
500
501 let stats = self.field_stats.entry(field.0).or_default();
502 stats.total_tokens += token_count as u64;
503 if element_ordinal == 0 {
504 stats.doc_count += 1;
505 }
506
507 if let Some(&slot) = self.field_to_slot.get(&field.0) {
508 self.doc_field_lengths[base_idx + slot] = token_count;
509 }
510 }
511
512 if let Some(ff) = self.fast_fields.get_mut(&field.0) {
514 ff.add_text(doc_id, text);
515 }
516 }
517 (FieldType::U64, FieldValue::U64(v)) => {
518 if entry.indexed {
519 self.index_numeric_field(*field, doc_id, *v)?;
520 }
521 if let Some(ff) = self.fast_fields.get_mut(&field.0) {
522 ff.add_u64(doc_id, *v);
523 }
524 }
525 (FieldType::I64, FieldValue::I64(v)) => {
526 if entry.indexed {
527 self.index_numeric_field(*field, doc_id, *v as u64)?;
528 }
529 if let Some(ff) = self.fast_fields.get_mut(&field.0) {
530 ff.add_i64(doc_id, *v);
531 }
532 }
533 (FieldType::F64, FieldValue::F64(v)) => {
534 if entry.indexed {
535 self.index_numeric_field(*field, doc_id, v.to_bits())?;
536 }
537 if let Some(ff) = self.fast_fields.get_mut(&field.0) {
538 ff.add_f64(doc_id, *v);
539 }
540 }
541 (FieldType::DenseVector, FieldValue::DenseVector(vec))
542 if entry.indexed || entry.stored =>
543 {
544 let ordinal = self.next_element_ordinal(field.0);
545 self.index_dense_vector_field(*field, doc_id, ordinal as u16, vec)?;
546 }
547 (FieldType::BinaryDenseVector, FieldValue::BinaryDenseVector(bytes))
548 if entry.indexed || entry.stored =>
549 {
550 let ordinal = self.next_element_ordinal(field.0);
551 self.index_binary_dense_vector_field(*field, doc_id, ordinal as u16, bytes)?;
552 }
553 (FieldType::SparseVector, FieldValue::SparseVector(entries)) => {
554 let ordinal = self.next_element_ordinal(field.0);
555 self.index_sparse_vector_field(*field, doc_id, ordinal as u16, entries)?;
556 }
557 _ => {}
558 }
559 }
560
561 self.write_document_to_store(&doc)?;
563
564 Ok(doc_id)
565 }
566
567 fn index_text_field(
576 &mut self,
577 field: Field,
578 doc_id: DocId,
579 text: &str,
580 element_ordinal: u32,
581 ) -> Result<u32> {
582 use crate::dsl::PositionMode;
583
584 let field_id = field.0;
585 let position_mode = self
586 .position_enabled_fields
587 .get(&field_id)
588 .copied()
589 .flatten();
590
591 self.local_tf_buffer.clear();
595 for v in self.local_positions.values_mut() {
597 v.clear();
598 }
599
600 let mut token_position = 0u32;
601
602 let custom_tokens = self.tokenizers.get(&field).map(|t| t.tokenize(text));
606
607 if let Some(tokens) = custom_tokens {
608 for token in &tokens {
610 let term_spur = if let Some(spur) = self.term_interner.get(&token.text) {
611 spur
612 } else {
613 let spur = self.term_interner.get_or_intern(&token.text);
614 self.estimated_memory += token.text.len() + INTERN_OVERHEAD;
615 spur
616 };
617 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
618
619 if let Some(mode) = position_mode {
620 let encoded_pos = match mode {
621 PositionMode::Ordinal => element_ordinal << 20,
622 PositionMode::TokenPosition => token.position,
623 PositionMode::Full => (element_ordinal << 20) | token.position,
624 };
625 self.local_positions
626 .entry(term_spur)
627 .or_default()
628 .push(encoded_pos);
629 }
630 }
631 token_position = tokens.len() as u32;
632 } else {
633 for word in text.split_whitespace() {
635 self.token_buffer.clear();
636 for c in word.chars() {
637 if c.is_alphanumeric() {
638 for lc in c.to_lowercase() {
639 self.token_buffer.push(lc);
640 }
641 }
642 }
643
644 if self.token_buffer.is_empty() {
645 continue;
646 }
647
648 let term_spur = if let Some(spur) = self.term_interner.get(&self.token_buffer) {
649 spur
650 } else {
651 let spur = self.term_interner.get_or_intern(&self.token_buffer);
652 self.estimated_memory += self.token_buffer.len() + INTERN_OVERHEAD;
653 spur
654 };
655 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
656
657 if let Some(mode) = position_mode {
658 let encoded_pos = match mode {
659 PositionMode::Ordinal => element_ordinal << 20,
660 PositionMode::TokenPosition => token_position,
661 PositionMode::Full => (element_ordinal << 20) | token_position,
662 };
663 self.local_positions
664 .entry(term_spur)
665 .or_default()
666 .push(encoded_pos);
667 }
668
669 token_position += 1;
670 }
671 }
672
673 for (&term_spur, &tf) in &self.local_tf_buffer {
676 let term_key = TermKey {
677 field: field_id,
678 term: term_spur,
679 };
680
681 match self.inverted_index.entry(term_key) {
682 hashbrown::hash_map::Entry::Occupied(mut o) => {
683 o.get_mut().add(doc_id, tf);
684 self.estimated_memory += size_of::<CompactPosting>();
685 }
686 hashbrown::hash_map::Entry::Vacant(v) => {
687 let mut posting = PostingListBuilder::new();
688 posting.add(doc_id, tf);
689 v.insert(posting);
690 self.estimated_memory += size_of::<CompactPosting>() + NEW_TERM_OVERHEAD;
691 }
692 }
693
694 if position_mode.is_some()
695 && let Some(positions) = self.local_positions.get(&term_spur)
696 {
697 match self.position_index.entry(term_key) {
698 hashbrown::hash_map::Entry::Occupied(mut o) => {
699 for &pos in positions {
700 o.get_mut().add_position(doc_id, pos);
701 }
702 self.estimated_memory += positions.len() * size_of::<u32>();
703 }
704 hashbrown::hash_map::Entry::Vacant(v) => {
705 let mut pos_posting = PositionPostingListBuilder::new();
706 for &pos in positions {
707 pos_posting.add_position(doc_id, pos);
708 }
709 self.estimated_memory +=
710 positions.len() * size_of::<u32>() + NEW_POS_TERM_OVERHEAD;
711 v.insert(pos_posting);
712 }
713 }
714 }
715 }
716
717 Ok(token_position)
718 }
719
720 fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
721 use std::fmt::Write;
722
723 self.numeric_buffer.clear();
724 write!(self.numeric_buffer, "__num_{}", value).unwrap();
725 let term_spur = if let Some(spur) = self.term_interner.get(&self.numeric_buffer) {
726 spur
727 } else {
728 let spur = self.term_interner.get_or_intern(&self.numeric_buffer);
729 self.estimated_memory += self.numeric_buffer.len() + INTERN_OVERHEAD;
730 spur
731 };
732
733 let term_key = TermKey {
734 field: field.0,
735 term: term_spur,
736 };
737
738 match self.inverted_index.entry(term_key) {
739 hashbrown::hash_map::Entry::Occupied(mut o) => {
740 o.get_mut().add(doc_id, 1);
741 self.estimated_memory += size_of::<CompactPosting>();
742 }
743 hashbrown::hash_map::Entry::Vacant(v) => {
744 let mut posting = PostingListBuilder::new();
745 posting.add(doc_id, 1);
746 v.insert(posting);
747 self.estimated_memory += size_of::<CompactPosting>() + NEW_TERM_OVERHEAD;
748 }
749 }
750
751 Ok(())
752 }
753
754 fn index_dense_vector_field(
756 &mut self,
757 field: Field,
758 doc_id: DocId,
759 ordinal: u16,
760 vector: &[f32],
761 ) -> Result<()> {
762 let dim = vector.len();
763
764 let builder = self
765 .dense_vectors
766 .entry(field.0)
767 .or_insert_with(|| DenseVectorBuilder::new(dim));
768
769 if builder.dim != dim && builder.len() > 0 {
771 return Err(crate::Error::Schema(format!(
772 "Dense vector dimension mismatch: expected {}, got {}",
773 builder.dim, dim
774 )));
775 }
776
777 builder.add(doc_id, ordinal, vector);
778
779 self.estimated_memory += std::mem::size_of_val(vector) + size_of::<(DocId, u16)>();
780
781 Ok(())
782 }
783
784 fn index_binary_dense_vector_field(
786 &mut self,
787 field: Field,
788 doc_id: DocId,
789 ordinal: u16,
790 bytes: &[u8],
791 ) -> Result<()> {
792 let dim_bits = self
793 .schema
794 .get_field_entry(field)
795 .and_then(|e| e.binary_dense_vector_config.as_ref())
796 .map(|c| c.dim)
797 .ok_or_else(|| {
798 crate::Error::Schema("BinaryDenseVector field missing config".to_string())
799 })?;
800
801 let expected_byte_len = dim_bits.div_ceil(8);
802 if bytes.len() != expected_byte_len {
803 return Err(crate::Error::Schema(format!(
804 "Binary vector byte length mismatch: expected {} (dim={}), got {}",
805 expected_byte_len,
806 dim_bits,
807 bytes.len()
808 )));
809 }
810
811 let builder = self
812 .binary_dense_vectors
813 .entry(field.0)
814 .or_insert_with(|| BinaryDenseVectorBuilder::new(dim_bits));
815
816 builder.add(doc_id, ordinal, bytes);
817 self.estimated_memory += bytes.len() + size_of::<(DocId, u16)>();
818
819 Ok(())
820 }
821
822 fn index_sparse_vector_field(
829 &mut self,
830 field: Field,
831 doc_id: DocId,
832 ordinal: u16,
833 entries: &[(u32, f32)],
834 ) -> Result<()> {
835 let weight_threshold = self
837 .schema
838 .get_field_entry(field)
839 .and_then(|entry| entry.sparse_vector_config.as_ref())
840 .map(|config| config.weight_threshold)
841 .unwrap_or(0.0);
842
843 let builder = self
844 .sparse_vectors
845 .entry(field.0)
846 .or_insert_with(SparseVectorBuilder::new);
847
848 builder.inc_vector_count();
849
850 for &(dim_id, weight) in entries {
851 if weight.abs() < weight_threshold {
853 continue;
854 }
855
856 let is_new_dim = !builder.postings.contains_key(&dim_id);
857 builder.add(dim_id, doc_id, ordinal, weight);
858 self.estimated_memory += size_of::<(DocId, u16, f32)>();
859 if is_new_dim {
860 self.estimated_memory += size_of::<u32>() + size_of::<Vec<(DocId, u16, f32)>>() + 8; }
863 }
864
865 Ok(())
866 }
867
868 fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
870 use byteorder::{LittleEndian, WriteBytesExt};
871
872 super::store::serialize_document_into(doc, &self.schema, &mut self.doc_serialize_buffer)?;
873
874 #[cfg(feature = "native")]
875 {
876 self.store_file
877 .write_u32::<LittleEndian>(self.doc_serialize_buffer.len() as u32)?;
878 self.store_file.write_all(&self.doc_serialize_buffer)?;
879 }
880 #[cfg(not(feature = "native"))]
881 {
882 self.store_buffer
883 .write_u32::<LittleEndian>(self.doc_serialize_buffer.len() as u32)?;
884 self.store_buffer.write_all(&self.doc_serialize_buffer)?;
885 }
886
887 Ok(())
888 }
889
890 pub async fn build<D: Directory + DirectoryWriter>(
896 mut self,
897 dir: &D,
898 segment_id: SegmentId,
899 trained: Option<&super::TrainedVectorStructures>,
900 ) -> Result<SegmentMeta> {
901 #[cfg(feature = "native")]
903 self.store_file.flush()?;
904
905 let files = SegmentFiles::new(segment_id.0);
906
907 let position_index = std::mem::take(&mut self.position_index);
909 let position_offsets = if !position_index.is_empty() {
910 let mut pos_writer = dir.streaming_writer(&files.positions).await?;
911 let offsets = postings::build_positions_streaming(
912 position_index,
913 &self.term_interner,
914 &mut *pos_writer,
915 )?;
916 pos_writer.finish()?;
917 offsets
918 } else {
919 FxHashMap::default()
920 };
921
922 let inverted_index = std::mem::take(&mut self.inverted_index);
925 let term_interner = std::mem::replace(&mut self.term_interner, Rodeo::new());
926 #[cfg(feature = "native")]
927 let store_path = self.store_path.clone();
928 #[cfg(feature = "native")]
929 let num_compression_threads = self.config.num_compression_threads;
930 let compression_level = self.config.compression_level;
931 let dense_vectors = std::mem::take(&mut self.dense_vectors);
932 let binary_dense_vectors = std::mem::take(&mut self.binary_dense_vectors);
933 let mut sparse_vectors = std::mem::take(&mut self.sparse_vectors);
934 let schema = &self.schema;
935
936 let mut term_dict_writer =
939 super::OffsetWriter::new(dir.streaming_writer(&files.term_dict).await?);
940 let mut postings_writer =
941 super::OffsetWriter::new(dir.streaming_writer(&files.postings).await?);
942 let mut store_writer = super::OffsetWriter::new(dir.streaming_writer(&files.store).await?);
943 let mut vectors_writer = if !dense_vectors.is_empty() || !binary_dense_vectors.is_empty() {
944 Some(super::OffsetWriter::new(
945 dir.streaming_writer(&files.vectors).await?,
946 ))
947 } else {
948 None
949 };
950 let mut sparse_writer = if !sparse_vectors.is_empty() {
951 Some(super::OffsetWriter::new(
952 dir.streaming_writer(&files.sparse).await?,
953 ))
954 } else {
955 None
956 };
957 let mut fast_fields = std::mem::take(&mut self.fast_fields);
958 let num_docs = self.next_doc_id;
959 let mut fast_writer = if !fast_fields.is_empty() {
960 Some(super::OffsetWriter::new(
961 dir.streaming_writer(&files.fast).await?,
962 ))
963 } else {
964 None
965 };
966
967 #[cfg(feature = "native")]
968 {
969 let ((postings_result, store_result), ((vectors_result, sparse_result), fast_result)) =
970 rayon::join(
971 || {
972 rayon::join(
973 || {
974 postings::build_postings_streaming(
975 inverted_index,
976 term_interner,
977 &position_offsets,
978 &mut term_dict_writer,
979 &mut postings_writer,
980 )
981 },
982 || {
983 store::build_store_streaming(
984 &store_path,
985 num_compression_threads,
986 compression_level,
987 &mut store_writer,
988 num_docs,
989 )
990 },
991 )
992 },
993 || {
994 rayon::join(
995 || {
996 rayon::join(
997 || -> Result<()> {
998 if let Some(ref mut w) = vectors_writer {
999 dense::build_vectors_streaming(
1000 dense_vectors,
1001 binary_dense_vectors,
1002 schema,
1003 trained,
1004 w,
1005 )?;
1006 }
1007 Ok(())
1008 },
1009 || -> Result<()> {
1010 if let Some(ref mut w) = sparse_writer {
1011 sparse::build_sparse_streaming(
1012 &mut sparse_vectors,
1013 schema,
1014 w,
1015 )?;
1016 }
1017 Ok(())
1018 },
1019 )
1020 },
1021 || -> Result<()> {
1022 if let Some(ref mut w) = fast_writer {
1023 build_fast_fields_streaming(&mut fast_fields, num_docs, w)?;
1024 }
1025 Ok(())
1026 },
1027 )
1028 },
1029 );
1030 postings_result?;
1031 store_result?;
1032 vectors_result?;
1033 sparse_result?;
1034 fast_result?;
1035 }
1036
1037 #[cfg(not(feature = "native"))]
1038 {
1039 postings::build_postings_streaming(
1040 inverted_index,
1041 term_interner,
1042 &position_offsets,
1043 &mut term_dict_writer,
1044 &mut postings_writer,
1045 )?;
1046 store::build_store_streaming_from_buffer(
1047 &self.store_buffer,
1048 compression_level,
1049 &mut store_writer,
1050 num_docs,
1051 )?;
1052 if let Some(ref mut w) = vectors_writer {
1053 dense::build_vectors_streaming(
1054 dense_vectors,
1055 binary_dense_vectors,
1056 schema,
1057 trained,
1058 w,
1059 )?;
1060 }
1061 if let Some(ref mut w) = sparse_writer {
1062 sparse::build_sparse_streaming(&mut sparse_vectors, schema, w)?;
1063 }
1064 if let Some(ref mut w) = fast_writer {
1065 build_fast_fields_streaming(&mut fast_fields, num_docs, w)?;
1066 }
1067 }
1068
1069 let term_dict_bytes = term_dict_writer.offset() as usize;
1070 let postings_bytes = postings_writer.offset() as usize;
1071 let store_bytes = store_writer.offset() as usize;
1072 let vectors_bytes = vectors_writer.as_ref().map_or(0, |w| w.offset() as usize);
1073 let sparse_bytes = sparse_writer.as_ref().map_or(0, |w| w.offset() as usize);
1074 let fast_bytes = fast_writer.as_ref().map_or(0, |w| w.offset() as usize);
1075
1076 term_dict_writer.finish()?;
1077 postings_writer.finish()?;
1078 store_writer.finish()?;
1079 if let Some(w) = vectors_writer {
1080 w.finish()?;
1081 }
1082 if let Some(w) = sparse_writer {
1083 w.finish()?;
1084 }
1085 if let Some(w) = fast_writer {
1086 w.finish()?;
1087 }
1088 drop(position_offsets);
1089 drop(sparse_vectors);
1090
1091 log::info!(
1092 "[segment_build] {} docs: term_dict={}, postings={}, store={}, vectors={}, sparse={}, fast={}",
1093 num_docs,
1094 super::format_bytes(term_dict_bytes),
1095 super::format_bytes(postings_bytes),
1096 super::format_bytes(store_bytes),
1097 super::format_bytes(vectors_bytes),
1098 super::format_bytes(sparse_bytes),
1099 super::format_bytes(fast_bytes),
1100 );
1101
1102 let meta = SegmentMeta {
1103 id: segment_id.0,
1104 num_docs: self.next_doc_id,
1105 field_stats: self.field_stats.clone(),
1106 };
1107
1108 dir.write(&files.meta, &meta.serialize()?).await?;
1109
1110 #[cfg(feature = "native")]
1112 {
1113 let _ = std::fs::remove_file(&self.store_path);
1114 }
1115
1116 Ok(meta)
1117 }
1118}
1119
1120fn build_fast_fields_streaming(
1122 fast_fields: &mut FxHashMap<u32, crate::structures::fast_field::FastFieldWriter>,
1123 num_docs: u32,
1124 writer: &mut dyn Write,
1125) -> Result<()> {
1126 use crate::structures::fast_field::{FastFieldTocEntry, write_fast_field_toc_and_footer};
1127
1128 if fast_fields.is_empty() {
1129 return Ok(());
1130 }
1131
1132 let mut field_ids: Vec<u32> = fast_fields.keys().copied().collect();
1134 field_ids.sort_unstable();
1135
1136 let mut toc_entries: Vec<FastFieldTocEntry> = Vec::with_capacity(field_ids.len());
1137 let mut current_offset = 0u64;
1138
1139 for &field_id in &field_ids {
1140 let ff = fast_fields.get_mut(&field_id).unwrap();
1141 ff.pad_to(num_docs);
1142
1143 let (mut toc, bytes_written) = ff.serialize(writer, current_offset)?;
1144 toc.field_id = field_id;
1145 current_offset += bytes_written;
1146 toc_entries.push(toc);
1147 }
1148
1149 let toc_offset = current_offset;
1151 write_fast_field_toc_and_footer(writer, toc_offset, &toc_entries)?;
1152
1153 Ok(())
1154}
1155
1156#[cfg(feature = "native")]
1157impl Drop for SegmentBuilder {
1158 fn drop(&mut self) {
1159 let _ = std::fs::remove_file(&self.store_path);
1161 }
1162}