1#[cfg_attr(not(feature = "native"), allow(dead_code))]
12pub(crate) mod bmp;
13mod config;
14mod dense;
15#[cfg(feature = "diagnostics")]
16mod diagnostics;
17#[cfg_attr(not(feature = "native"), allow(dead_code))]
18pub(crate) mod graph_bisection;
19mod postings;
20mod sparse;
21mod store;
22
23pub use config::{MemoryBreakdown, SegmentBuilderConfig, SegmentBuilderStats};
24
25#[cfg(feature = "native")]
26use std::fs::{File, OpenOptions};
27#[cfg(feature = "native")]
28use std::io::BufWriter;
29use std::io::Write;
30use std::mem::size_of;
31#[cfg(feature = "native")]
32use std::path::PathBuf;
33
34use hashbrown::HashMap;
35use rustc_hash::FxHashMap;
36
37#[cfg(feature = "native")]
39use lasso::{Rodeo, Spur};
40
41#[cfg(not(feature = "native"))]
42pub(crate) mod simple_interner {
43 use hashbrown::HashMap;
44
45 #[derive(Clone, Copy, PartialEq, Eq, Hash)]
46 pub struct Spur(u32);
47
48 pub struct Rodeo {
51 strings: Vec<Box<str>>,
53 map: HashMap<&'static str, u32>,
56 }
57
58 impl Rodeo {
59 pub fn new() -> Self {
60 Self {
61 strings: Vec::new(),
62 map: HashMap::new(),
63 }
64 }
65
66 pub fn get(&self, key: &str) -> Option<Spur> {
67 self.map.get(key).map(|&id| Spur(id))
68 }
69
70 pub fn get_or_intern(&mut self, key: &str) -> Spur {
71 if let Some(&id) = self.map.get(key) {
72 return Spur(id);
73 }
74 let id = self.strings.len() as u32;
75 let boxed: Box<str> = key.into();
76 let static_ref: &'static str = unsafe { &*(boxed.as_ref() as *const str) };
79 self.strings.push(boxed);
80 self.map.insert(static_ref, id);
81 Spur(id)
82 }
83
84 pub fn resolve(&self, spur: &Spur) -> &str {
85 &self.strings[spur.0 as usize]
86 }
87
88 pub fn len(&self) -> usize {
89 self.strings.len()
90 }
91 }
92}
93
94#[cfg(not(feature = "native"))]
95use simple_interner::{Rodeo, Spur};
96
97use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
98use std::sync::Arc;
99
100use crate::directories::{Directory, DirectoryWriter};
101use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
102use crate::tokenizer::BoxedTokenizer;
103use crate::{DocId, Result};
104
105use dense::{BinaryDenseVectorBuilder, DenseVectorBuilder};
106use postings::{CompactPosting, PositionPostingListBuilder, PostingListBuilder, TermKey};
107use sparse::SparseVectorBuilder;
108
109const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; const NEW_TERM_OVERHEAD: usize = size_of::<TermKey>() + size_of::<PostingListBuilder>() + 24;
115
116const INTERN_OVERHEAD: usize = size_of::<Spur>() + 2 * size_of::<usize>();
118
119const NEW_POS_TERM_OVERHEAD: usize =
121 size_of::<TermKey>() + size_of::<PositionPostingListBuilder>() + 24;
122
123pub struct SegmentBuilder {
130 schema: Arc<Schema>,
131 config: SegmentBuilderConfig,
132 tokenizers: FxHashMap<Field, BoxedTokenizer>,
133
134 term_interner: Rodeo,
136
137 inverted_index: HashMap<TermKey, PostingListBuilder>,
139
140 #[cfg(feature = "native")]
142 store_file: BufWriter<File>,
143 #[cfg(feature = "native")]
144 store_path: PathBuf,
145 #[cfg(not(feature = "native"))]
146 store_buffer: Vec<u8>,
147
148 next_doc_id: DocId,
150
151 field_stats: FxHashMap<u32, FieldStats>,
153
154 doc_field_lengths: Vec<u32>,
158 num_indexed_fields: usize,
159 field_to_slot: FxHashMap<u32, usize>,
160
161 local_tf_buffer: FxHashMap<Spur, u32>,
164
165 local_positions: FxHashMap<Spur, Vec<u32>>,
168
169 token_buffer: String,
171
172 numeric_buffer: String,
174
175 dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
178
179 binary_dense_vectors: FxHashMap<u32, BinaryDenseVectorBuilder>,
181
182 sparse_vectors: FxHashMap<u32, SparseVectorBuilder>,
185
186 position_index: HashMap<TermKey, PositionPostingListBuilder>,
189
190 position_enabled_fields: FxHashMap<u32, Option<crate::dsl::PositionMode>>,
192
193 current_element_ordinal: FxHashMap<u32, u32>,
195
196 estimated_memory: usize,
198
199 doc_serialize_buffer: Vec<u8>,
201
202 fast_fields: FxHashMap<u32, crate::structures::fast_field::FastFieldWriter>,
204}
205
206impl SegmentBuilder {
207 pub fn new(schema: Arc<Schema>, config: SegmentBuilderConfig) -> Result<Self> {
209 #[cfg(feature = "native")]
210 let (store_file, store_path) = {
211 let segment_id = uuid::Uuid::new_v4();
212 let store_path = config
213 .temp_dir
214 .join(format!("hermes_store_{}.tmp", segment_id));
215 let store_file = BufWriter::with_capacity(
216 STORE_BUFFER_SIZE,
217 OpenOptions::new()
218 .create(true)
219 .write(true)
220 .truncate(true)
221 .open(&store_path)?,
222 );
223 (store_file, store_path)
224 };
225
226 let registry = crate::tokenizer::TokenizerRegistry::new();
228 let mut num_indexed_fields = 0;
229 let mut field_to_slot = FxHashMap::default();
230 let mut position_enabled_fields = FxHashMap::default();
231 let mut tokenizers = FxHashMap::default();
232 for (field, entry) in schema.fields() {
233 if entry.indexed && matches!(entry.field_type, FieldType::Text) {
234 field_to_slot.insert(field.0, num_indexed_fields);
235 num_indexed_fields += 1;
236 if entry.positions.is_some() {
237 position_enabled_fields.insert(field.0, entry.positions);
238 }
239 if let Some(ref tok_name) = entry.tokenizer
240 && let Some(tokenizer) = registry.get(tok_name)
241 {
242 tokenizers.insert(field, tokenizer);
243 }
244 }
245 }
246
247 use crate::structures::fast_field::{FastFieldColumnType, FastFieldWriter};
249 let mut fast_fields = FxHashMap::default();
250 for (field, entry) in schema.fields() {
251 if entry.fast {
252 let writer = if entry.multi {
253 match entry.field_type {
254 FieldType::U64 => {
255 FastFieldWriter::new_numeric_multi(FastFieldColumnType::U64)
256 }
257 FieldType::I64 => {
258 FastFieldWriter::new_numeric_multi(FastFieldColumnType::I64)
259 }
260 FieldType::F64 => {
261 FastFieldWriter::new_numeric_multi(FastFieldColumnType::F64)
262 }
263 FieldType::Text => FastFieldWriter::new_text_multi(),
264 _ => continue,
265 }
266 } else {
267 match entry.field_type {
268 FieldType::U64 => FastFieldWriter::new_numeric(FastFieldColumnType::U64),
269 FieldType::I64 => FastFieldWriter::new_numeric(FastFieldColumnType::I64),
270 FieldType::F64 => FastFieldWriter::new_numeric(FastFieldColumnType::F64),
271 FieldType::Text => FastFieldWriter::new_text(),
272 _ => continue,
273 }
274 };
275 fast_fields.insert(field.0, writer);
276 }
277 }
278
279 Ok(Self {
280 schema,
281 tokenizers,
282 term_interner: Rodeo::new(),
283 inverted_index: HashMap::with_capacity(config.posting_map_capacity),
284 #[cfg(feature = "native")]
285 store_file,
286 #[cfg(feature = "native")]
287 store_path,
288 #[cfg(not(feature = "native"))]
289 store_buffer: Vec::with_capacity(STORE_BUFFER_SIZE),
290 next_doc_id: 0,
291 field_stats: FxHashMap::default(),
292 doc_field_lengths: Vec::new(),
293 num_indexed_fields,
294 field_to_slot,
295 local_tf_buffer: FxHashMap::default(),
296 local_positions: FxHashMap::default(),
297 token_buffer: String::with_capacity(64),
298 numeric_buffer: String::with_capacity(32),
299 config,
300 dense_vectors: FxHashMap::default(),
301 binary_dense_vectors: FxHashMap::default(),
302 sparse_vectors: FxHashMap::default(),
303 position_index: HashMap::new(),
304 position_enabled_fields,
305 current_element_ordinal: FxHashMap::default(),
306 estimated_memory: 0,
307 doc_serialize_buffer: Vec::with_capacity(256),
308 fast_fields,
309 })
310 }
311
312 pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
313 self.tokenizers.insert(field, tokenizer);
314 }
315
316 fn next_element_ordinal(&mut self, field_id: u32) -> u32 {
319 let ordinal = *self.current_element_ordinal.get(&field_id).unwrap_or(&0);
320 *self.current_element_ordinal.entry(field_id).or_insert(0) += 1;
321 ordinal
322 }
323
324 pub fn num_docs(&self) -> u32 {
325 self.next_doc_id
326 }
327
328 #[inline]
330 pub fn estimated_memory_bytes(&self) -> usize {
331 self.estimated_memory
332 }
333
334 pub fn sparse_dim_count(&self) -> usize {
336 self.sparse_vectors.values().map(|b| b.postings.len()).sum()
337 }
338
339 pub fn stats(&self) -> SegmentBuilderStats {
341 use std::mem::size_of;
342
343 let postings_in_memory: usize =
344 self.inverted_index.values().map(|p| p.postings.len()).sum();
345
346 let compact_posting_size = size_of::<CompactPosting>();
348 let vec_overhead = size_of::<Vec<u8>>(); let term_key_size = size_of::<TermKey>();
350 let posting_builder_size = size_of::<PostingListBuilder>();
351 let spur_size = size_of::<Spur>();
352 let sparse_entry_size = size_of::<(DocId, u16, f32)>();
353
354 let hashmap_entry_base_overhead = 8usize;
357
358 let fxhashmap_entry_overhead = hashmap_entry_base_overhead;
360
361 let postings_bytes: usize = self
363 .inverted_index
364 .values()
365 .map(|p| p.postings.capacity() * compact_posting_size + vec_overhead)
366 .sum();
367
368 let index_overhead_bytes = self.inverted_index.len()
370 * (term_key_size + posting_builder_size + hashmap_entry_base_overhead);
371
372 let interner_arena_overhead = 2 * size_of::<usize>();
375 let avg_term_len = 8; let interner_bytes =
377 self.term_interner.len() * (avg_term_len + spur_size + interner_arena_overhead);
378
379 let field_lengths_bytes =
381 self.doc_field_lengths.capacity() * size_of::<u32>() + vec_overhead;
382
383 let mut dense_vectors_bytes: usize = 0;
385 let mut dense_vector_count: usize = 0;
386 let doc_id_ordinal_size = size_of::<(DocId, u16)>();
387 for b in self.dense_vectors.values() {
388 dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
389 + b.doc_ids.capacity() * doc_id_ordinal_size
390 + 2 * vec_overhead; dense_vector_count += b.doc_ids.len();
392 }
393 for b in self.binary_dense_vectors.values() {
395 dense_vectors_bytes += b.vectors.capacity()
396 + b.doc_ids.capacity() * doc_id_ordinal_size
397 + 2 * vec_overhead;
398 dense_vector_count += b.doc_ids.len();
399 }
400
401 let local_tf_entry_size = spur_size + size_of::<u32>() + fxhashmap_entry_overhead;
403 let local_tf_buffer_bytes = self.local_tf_buffer.capacity() * local_tf_entry_size;
404
405 let mut sparse_vectors_bytes: usize = 0;
407 for builder in self.sparse_vectors.values() {
408 for postings in builder.postings.values() {
409 sparse_vectors_bytes += postings.capacity() * sparse_entry_size + vec_overhead;
410 }
411 let inner_entry_size = size_of::<u32>() + vec_overhead + fxhashmap_entry_overhead;
413 sparse_vectors_bytes += builder.postings.len() * inner_entry_size;
414 }
415 let outer_sparse_entry_size =
417 size_of::<u32>() + size_of::<SparseVectorBuilder>() + fxhashmap_entry_overhead;
418 sparse_vectors_bytes += self.sparse_vectors.len() * outer_sparse_entry_size;
419
420 let mut position_index_bytes: usize = 0;
422 for pos_builder in self.position_index.values() {
423 for (_, positions) in &pos_builder.postings {
424 position_index_bytes += positions.capacity() * size_of::<u32>() + vec_overhead;
425 }
426 let pos_entry_size = size_of::<DocId>() + vec_overhead;
428 position_index_bytes += pos_builder.postings.capacity() * pos_entry_size;
429 }
430 let pos_index_entry_size =
432 term_key_size + size_of::<PositionPostingListBuilder>() + hashmap_entry_base_overhead;
433 position_index_bytes += self.position_index.len() * pos_index_entry_size;
434
435 let estimated_memory_bytes = postings_bytes
436 + index_overhead_bytes
437 + interner_bytes
438 + field_lengths_bytes
439 + dense_vectors_bytes
440 + local_tf_buffer_bytes
441 + sparse_vectors_bytes
442 + position_index_bytes;
443
444 let memory_breakdown = MemoryBreakdown {
445 postings_bytes,
446 index_overhead_bytes,
447 interner_bytes,
448 field_lengths_bytes,
449 dense_vectors_bytes,
450 dense_vector_count,
451 sparse_vectors_bytes,
452 position_index_bytes,
453 };
454
455 SegmentBuilderStats {
456 num_docs: self.next_doc_id,
457 unique_terms: self.inverted_index.len(),
458 postings_in_memory,
459 interned_strings: self.term_interner.len(),
460 doc_field_lengths_size: self.doc_field_lengths.len(),
461 estimated_memory_bytes,
462 memory_breakdown,
463 }
464 }
465
466 pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
468 let doc_id = self.next_doc_id;
469 self.next_doc_id += 1;
470
471 let base_idx = self.doc_field_lengths.len();
473 self.doc_field_lengths
474 .resize(base_idx + self.num_indexed_fields, 0);
475 self.estimated_memory += self.num_indexed_fields * std::mem::size_of::<u32>();
476
477 self.current_element_ordinal.clear();
479
480 for (field, value) in doc.field_values() {
481 let Some(entry) = self.schema.get_field_entry(*field) else {
482 continue;
483 };
484
485 if !matches!(
488 &entry.field_type,
489 FieldType::DenseVector | FieldType::BinaryDenseVector
490 ) && !entry.indexed
491 && !entry.fast
492 {
493 continue;
494 }
495
496 match (&entry.field_type, value) {
497 (FieldType::Text, FieldValue::Text(text)) => {
498 if entry.indexed {
499 let element_ordinal = self.next_element_ordinal(field.0);
500 let token_count =
501 self.index_text_field(*field, doc_id, text, element_ordinal)?;
502
503 let stats = self.field_stats.entry(field.0).or_default();
504 stats.total_tokens += token_count as u64;
505 if element_ordinal == 0 {
506 stats.doc_count += 1;
507 }
508
509 if let Some(&slot) = self.field_to_slot.get(&field.0) {
510 self.doc_field_lengths[base_idx + slot] = token_count;
511 }
512 }
513
514 if let Some(ff) = self.fast_fields.get_mut(&field.0) {
516 ff.add_text(doc_id, text);
517 }
518 }
519 (FieldType::U64, FieldValue::U64(v)) => {
520 if entry.indexed {
521 self.index_numeric_field(*field, doc_id, *v)?;
522 }
523 if let Some(ff) = self.fast_fields.get_mut(&field.0) {
524 ff.add_u64(doc_id, *v);
525 }
526 }
527 (FieldType::I64, FieldValue::I64(v)) => {
528 if entry.indexed {
529 self.index_numeric_field(*field, doc_id, *v as u64)?;
530 }
531 if let Some(ff) = self.fast_fields.get_mut(&field.0) {
532 ff.add_i64(doc_id, *v);
533 }
534 }
535 (FieldType::F64, FieldValue::F64(v)) => {
536 if entry.indexed {
537 self.index_numeric_field(*field, doc_id, v.to_bits())?;
538 }
539 if let Some(ff) = self.fast_fields.get_mut(&field.0) {
540 ff.add_f64(doc_id, *v);
541 }
542 }
543 (FieldType::DenseVector, FieldValue::DenseVector(vec))
544 if entry.indexed || entry.stored =>
545 {
546 let ordinal = self.next_element_ordinal(field.0);
547 self.index_dense_vector_field(*field, doc_id, ordinal as u16, vec)?;
548 }
549 (FieldType::BinaryDenseVector, FieldValue::BinaryDenseVector(bytes))
550 if entry.indexed || entry.stored =>
551 {
552 let ordinal = self.next_element_ordinal(field.0);
553 self.index_binary_dense_vector_field(*field, doc_id, ordinal as u16, bytes)?;
554 }
555 (FieldType::SparseVector, FieldValue::SparseVector(entries)) => {
556 let ordinal = self.next_element_ordinal(field.0);
557 self.index_sparse_vector_field(*field, doc_id, ordinal as u16, entries)?;
558 }
559 _ => {}
560 }
561 }
562
563 self.write_document_to_store(&doc)?;
565
566 Ok(doc_id)
567 }
568
569 fn index_text_field(
578 &mut self,
579 field: Field,
580 doc_id: DocId,
581 text: &str,
582 element_ordinal: u32,
583 ) -> Result<u32> {
584 use crate::dsl::PositionMode;
585
586 let field_id = field.0;
587 let position_mode = self
588 .position_enabled_fields
589 .get(&field_id)
590 .copied()
591 .flatten();
592
593 self.local_tf_buffer.clear();
597 for v in self.local_positions.values_mut() {
599 v.clear();
600 }
601
602 let mut token_position = 0u32;
603
604 let custom_tokens = self.tokenizers.get(&field).map(|t| t.tokenize(text));
608
609 if let Some(tokens) = custom_tokens {
610 for token in &tokens {
612 let term_spur = if let Some(spur) = self.term_interner.get(&token.text) {
613 spur
614 } else {
615 let spur = self.term_interner.get_or_intern(&token.text);
616 self.estimated_memory += token.text.len() + INTERN_OVERHEAD;
617 spur
618 };
619 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
620
621 if let Some(mode) = position_mode {
622 let encoded_pos = match mode {
623 PositionMode::Ordinal => element_ordinal << 20,
624 PositionMode::TokenPosition => token.position,
625 PositionMode::Full => (element_ordinal << 20) | token.position,
626 };
627 self.local_positions
628 .entry(term_spur)
629 .or_default()
630 .push(encoded_pos);
631 }
632 }
633 token_position = tokens.len() as u32;
634 } else {
635 for word in text.split_whitespace() {
637 self.token_buffer.clear();
638 for c in word.chars() {
639 if c.is_alphanumeric() {
640 for lc in c.to_lowercase() {
641 self.token_buffer.push(lc);
642 }
643 }
644 }
645
646 if self.token_buffer.is_empty() {
647 continue;
648 }
649
650 let term_spur = if let Some(spur) = self.term_interner.get(&self.token_buffer) {
651 spur
652 } else {
653 let spur = self.term_interner.get_or_intern(&self.token_buffer);
654 self.estimated_memory += self.token_buffer.len() + INTERN_OVERHEAD;
655 spur
656 };
657 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
658
659 if let Some(mode) = position_mode {
660 let encoded_pos = match mode {
661 PositionMode::Ordinal => element_ordinal << 20,
662 PositionMode::TokenPosition => token_position,
663 PositionMode::Full => (element_ordinal << 20) | token_position,
664 };
665 self.local_positions
666 .entry(term_spur)
667 .or_default()
668 .push(encoded_pos);
669 }
670
671 token_position += 1;
672 }
673 }
674
675 for (&term_spur, &tf) in &self.local_tf_buffer {
678 let term_key = TermKey {
679 field: field_id,
680 term: term_spur,
681 };
682
683 match self.inverted_index.entry(term_key) {
684 hashbrown::hash_map::Entry::Occupied(mut o) => {
685 o.get_mut().add(doc_id, tf);
686 self.estimated_memory += size_of::<CompactPosting>();
687 }
688 hashbrown::hash_map::Entry::Vacant(v) => {
689 let mut posting = PostingListBuilder::new();
690 posting.add(doc_id, tf);
691 v.insert(posting);
692 self.estimated_memory += size_of::<CompactPosting>() + NEW_TERM_OVERHEAD;
693 }
694 }
695
696 if position_mode.is_some()
697 && let Some(positions) = self.local_positions.get(&term_spur)
698 {
699 match self.position_index.entry(term_key) {
700 hashbrown::hash_map::Entry::Occupied(mut o) => {
701 for &pos in positions {
702 o.get_mut().add_position(doc_id, pos);
703 }
704 self.estimated_memory += positions.len() * size_of::<u32>();
705 }
706 hashbrown::hash_map::Entry::Vacant(v) => {
707 let mut pos_posting = PositionPostingListBuilder::new();
708 for &pos in positions {
709 pos_posting.add_position(doc_id, pos);
710 }
711 self.estimated_memory +=
712 positions.len() * size_of::<u32>() + NEW_POS_TERM_OVERHEAD;
713 v.insert(pos_posting);
714 }
715 }
716 }
717 }
718
719 Ok(token_position)
720 }
721
722 fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
723 use std::fmt::Write;
724
725 self.numeric_buffer.clear();
726 write!(self.numeric_buffer, "__num_{}", value).unwrap();
727 let term_spur = if let Some(spur) = self.term_interner.get(&self.numeric_buffer) {
728 spur
729 } else {
730 let spur = self.term_interner.get_or_intern(&self.numeric_buffer);
731 self.estimated_memory += self.numeric_buffer.len() + INTERN_OVERHEAD;
732 spur
733 };
734
735 let term_key = TermKey {
736 field: field.0,
737 term: term_spur,
738 };
739
740 match self.inverted_index.entry(term_key) {
741 hashbrown::hash_map::Entry::Occupied(mut o) => {
742 o.get_mut().add(doc_id, 1);
743 self.estimated_memory += size_of::<CompactPosting>();
744 }
745 hashbrown::hash_map::Entry::Vacant(v) => {
746 let mut posting = PostingListBuilder::new();
747 posting.add(doc_id, 1);
748 v.insert(posting);
749 self.estimated_memory += size_of::<CompactPosting>() + NEW_TERM_OVERHEAD;
750 }
751 }
752
753 Ok(())
754 }
755
756 fn index_dense_vector_field(
758 &mut self,
759 field: Field,
760 doc_id: DocId,
761 ordinal: u16,
762 vector: &[f32],
763 ) -> Result<()> {
764 let dim = vector.len();
765
766 let builder = self
767 .dense_vectors
768 .entry(field.0)
769 .or_insert_with(|| DenseVectorBuilder::new(dim));
770
771 if builder.dim != dim && builder.len() > 0 {
773 return Err(crate::Error::Schema(format!(
774 "Dense vector dimension mismatch: expected {}, got {}",
775 builder.dim, dim
776 )));
777 }
778
779 builder.add(doc_id, ordinal, vector);
780
781 self.estimated_memory += std::mem::size_of_val(vector) + size_of::<(DocId, u16)>();
782
783 Ok(())
784 }
785
786 fn index_binary_dense_vector_field(
788 &mut self,
789 field: Field,
790 doc_id: DocId,
791 ordinal: u16,
792 bytes: &[u8],
793 ) -> Result<()> {
794 let dim_bits = self
795 .schema
796 .get_field_entry(field)
797 .and_then(|e| e.binary_dense_vector_config.as_ref())
798 .map(|c| c.dim)
799 .ok_or_else(|| {
800 crate::Error::Schema("BinaryDenseVector field missing config".to_string())
801 })?;
802
803 let expected_byte_len = dim_bits.div_ceil(8);
804 if bytes.len() != expected_byte_len {
805 return Err(crate::Error::Schema(format!(
806 "Binary vector byte length mismatch: expected {} (dim={}), got {}",
807 expected_byte_len,
808 dim_bits,
809 bytes.len()
810 )));
811 }
812
813 let builder = self
814 .binary_dense_vectors
815 .entry(field.0)
816 .or_insert_with(|| BinaryDenseVectorBuilder::new(dim_bits));
817
818 builder.add(doc_id, ordinal, bytes);
819 self.estimated_memory += bytes.len() + size_of::<(DocId, u16)>();
820
821 Ok(())
822 }
823
824 fn index_sparse_vector_field(
831 &mut self,
832 field: Field,
833 doc_id: DocId,
834 ordinal: u16,
835 entries: &[(u32, f32)],
836 ) -> Result<()> {
837 let weight_threshold = self
839 .schema
840 .get_field_entry(field)
841 .and_then(|entry| entry.sparse_vector_config.as_ref())
842 .map(|config| config.weight_threshold)
843 .unwrap_or(0.0);
844
845 let builder = self
846 .sparse_vectors
847 .entry(field.0)
848 .or_insert_with(SparseVectorBuilder::new);
849
850 builder.inc_vector_count();
851
852 for &(dim_id, weight) in entries {
853 if weight.abs() < weight_threshold {
855 continue;
856 }
857
858 let is_new_dim = !builder.postings.contains_key(&dim_id);
859 builder.add(dim_id, doc_id, ordinal, weight);
860 self.estimated_memory += size_of::<(DocId, u16, f32)>();
861 if is_new_dim {
862 self.estimated_memory += size_of::<u32>() + size_of::<Vec<(DocId, u16, f32)>>() + 8; }
865 }
866
867 Ok(())
868 }
869
870 fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
872 use byteorder::{LittleEndian, WriteBytesExt};
873
874 super::store::serialize_document_into(doc, &self.schema, &mut self.doc_serialize_buffer)?;
875
876 #[cfg(feature = "native")]
877 {
878 self.store_file
879 .write_u32::<LittleEndian>(self.doc_serialize_buffer.len() as u32)?;
880 self.store_file.write_all(&self.doc_serialize_buffer)?;
881 }
882 #[cfg(not(feature = "native"))]
883 {
884 self.store_buffer
885 .write_u32::<LittleEndian>(self.doc_serialize_buffer.len() as u32)?;
886 self.store_buffer.write_all(&self.doc_serialize_buffer)?;
887 }
888
889 Ok(())
890 }
891
892 pub async fn build<D: Directory + DirectoryWriter>(
898 mut self,
899 dir: &D,
900 segment_id: SegmentId,
901 trained: Option<&super::TrainedVectorStructures>,
902 ) -> Result<SegmentMeta> {
903 #[cfg(feature = "native")]
905 self.store_file.flush()?;
906
907 let files = SegmentFiles::new(segment_id.0);
908
909 let position_index = std::mem::take(&mut self.position_index);
911 let position_offsets = if !position_index.is_empty() {
912 let mut pos_writer = dir.streaming_writer(&files.positions).await?;
913 let offsets = postings::build_positions_streaming(
914 position_index,
915 &self.term_interner,
916 &mut *pos_writer,
917 )?;
918 pos_writer.finish()?;
919 offsets
920 } else {
921 FxHashMap::default()
922 };
923
924 let inverted_index = std::mem::take(&mut self.inverted_index);
927 let term_interner = std::mem::replace(&mut self.term_interner, Rodeo::new());
928 #[cfg(feature = "native")]
929 let store_path = self.store_path.clone();
930 #[cfg(feature = "native")]
931 let num_compression_threads = self.config.num_compression_threads;
932 let compression_level = self.config.compression_level;
933 let dense_vectors = std::mem::take(&mut self.dense_vectors);
934 let binary_dense_vectors = std::mem::take(&mut self.binary_dense_vectors);
935 let mut sparse_vectors = std::mem::take(&mut self.sparse_vectors);
936 let schema = &self.schema;
937
938 let mut term_dict_writer =
941 super::OffsetWriter::new(dir.streaming_writer(&files.term_dict).await?);
942 let mut postings_writer =
943 super::OffsetWriter::new(dir.streaming_writer(&files.postings).await?);
944 let mut store_writer = super::OffsetWriter::new(dir.streaming_writer(&files.store).await?);
945 let mut vectors_writer = if !dense_vectors.is_empty() || !binary_dense_vectors.is_empty() {
946 Some(super::OffsetWriter::new(
947 dir.streaming_writer(&files.vectors).await?,
948 ))
949 } else {
950 None
951 };
952 let mut sparse_writer = if !sparse_vectors.is_empty() {
953 Some(super::OffsetWriter::new(
954 dir.streaming_writer(&files.sparse).await?,
955 ))
956 } else {
957 None
958 };
959 let mut fast_fields = std::mem::take(&mut self.fast_fields);
960 let num_docs = self.next_doc_id;
961 let mut fast_writer = if !fast_fields.is_empty() {
962 Some(super::OffsetWriter::new(
963 dir.streaming_writer(&files.fast).await?,
964 ))
965 } else {
966 None
967 };
968
969 #[cfg(feature = "native")]
970 {
971 let ((postings_result, store_result), ((vectors_result, sparse_result), fast_result)) =
972 rayon::join(
973 || {
974 rayon::join(
975 || {
976 postings::build_postings_streaming(
977 inverted_index,
978 term_interner,
979 &position_offsets,
980 &mut term_dict_writer,
981 &mut postings_writer,
982 )
983 },
984 || {
985 store::build_store_streaming(
986 &store_path,
987 num_compression_threads,
988 compression_level,
989 &mut store_writer,
990 num_docs,
991 )
992 },
993 )
994 },
995 || {
996 rayon::join(
997 || {
998 rayon::join(
999 || -> Result<()> {
1000 if let Some(ref mut w) = vectors_writer {
1001 dense::build_vectors_streaming(
1002 dense_vectors,
1003 binary_dense_vectors,
1004 schema,
1005 trained,
1006 w,
1007 )?;
1008 }
1009 Ok(())
1010 },
1011 || -> Result<()> {
1012 if let Some(ref mut w) = sparse_writer {
1013 sparse::build_sparse_streaming(
1014 &mut sparse_vectors,
1015 schema,
1016 w,
1017 )?;
1018 }
1019 Ok(())
1020 },
1021 )
1022 },
1023 || -> Result<()> {
1024 if let Some(ref mut w) = fast_writer {
1025 build_fast_fields_streaming(&mut fast_fields, num_docs, w)?;
1026 }
1027 Ok(())
1028 },
1029 )
1030 },
1031 );
1032 postings_result?;
1033 store_result?;
1034 vectors_result?;
1035 sparse_result?;
1036 fast_result?;
1037 }
1038
1039 #[cfg(not(feature = "native"))]
1040 {
1041 postings::build_postings_streaming(
1042 inverted_index,
1043 term_interner,
1044 &position_offsets,
1045 &mut term_dict_writer,
1046 &mut postings_writer,
1047 )?;
1048 store::build_store_streaming_from_buffer(
1049 &self.store_buffer,
1050 compression_level,
1051 &mut store_writer,
1052 num_docs,
1053 )?;
1054 if let Some(ref mut w) = vectors_writer {
1055 dense::build_vectors_streaming(
1056 dense_vectors,
1057 binary_dense_vectors,
1058 schema,
1059 trained,
1060 w,
1061 )?;
1062 }
1063 if let Some(ref mut w) = sparse_writer {
1064 sparse::build_sparse_streaming(&mut sparse_vectors, schema, w)?;
1065 }
1066 if let Some(ref mut w) = fast_writer {
1067 build_fast_fields_streaming(&mut fast_fields, num_docs, w)?;
1068 }
1069 }
1070
1071 let term_dict_bytes = term_dict_writer.offset() as usize;
1072 let postings_bytes = postings_writer.offset() as usize;
1073 let store_bytes = store_writer.offset() as usize;
1074 let vectors_bytes = vectors_writer.as_ref().map_or(0, |w| w.offset() as usize);
1075 let sparse_bytes = sparse_writer.as_ref().map_or(0, |w| w.offset() as usize);
1076 let fast_bytes = fast_writer.as_ref().map_or(0, |w| w.offset() as usize);
1077
1078 term_dict_writer.finish()?;
1079 postings_writer.finish()?;
1080 store_writer.finish()?;
1081 if let Some(w) = vectors_writer {
1082 w.finish()?;
1083 }
1084 if let Some(w) = sparse_writer {
1085 w.finish()?;
1086 }
1087 if let Some(w) = fast_writer {
1088 w.finish()?;
1089 }
1090 drop(position_offsets);
1091 drop(sparse_vectors);
1092
1093 log::info!(
1094 "[segment_build] {} docs: term_dict={}, postings={}, store={}, vectors={}, sparse={}, fast={}",
1095 num_docs,
1096 super::format_bytes(term_dict_bytes),
1097 super::format_bytes(postings_bytes),
1098 super::format_bytes(store_bytes),
1099 super::format_bytes(vectors_bytes),
1100 super::format_bytes(sparse_bytes),
1101 super::format_bytes(fast_bytes),
1102 );
1103
1104 let meta = SegmentMeta {
1105 id: segment_id.0,
1106 num_docs: self.next_doc_id,
1107 field_stats: self.field_stats.clone(),
1108 };
1109
1110 dir.write(&files.meta, &meta.serialize()?).await?;
1111
1112 #[cfg(feature = "native")]
1114 {
1115 let _ = std::fs::remove_file(&self.store_path);
1116 }
1117
1118 Ok(meta)
1119 }
1120}
1121
1122fn build_fast_fields_streaming(
1124 fast_fields: &mut FxHashMap<u32, crate::structures::fast_field::FastFieldWriter>,
1125 num_docs: u32,
1126 writer: &mut dyn Write,
1127) -> Result<()> {
1128 use crate::structures::fast_field::{FastFieldTocEntry, write_fast_field_toc_and_footer};
1129
1130 if fast_fields.is_empty() {
1131 return Ok(());
1132 }
1133
1134 let mut field_ids: Vec<u32> = fast_fields.keys().copied().collect();
1136 field_ids.sort_unstable();
1137
1138 let mut toc_entries: Vec<FastFieldTocEntry> = Vec::with_capacity(field_ids.len());
1139 let mut current_offset = 0u64;
1140
1141 for &field_id in &field_ids {
1142 let ff = fast_fields.get_mut(&field_id).unwrap();
1143 ff.pad_to(num_docs);
1144
1145 let (mut toc, bytes_written) = ff.serialize(writer, current_offset)?;
1146 toc.field_id = field_id;
1147 current_offset += bytes_written;
1148 toc_entries.push(toc);
1149 }
1150
1151 let toc_offset = current_offset;
1153 write_fast_field_toc_and_footer(writer, toc_offset, &toc_entries)?;
1154
1155 Ok(())
1156}
1157
1158#[cfg(feature = "native")]
1159impl Drop for SegmentBuilder {
1160 fn drop(&mut self) {
1161 let _ = std::fs::remove_file(&self.store_path);
1163 }
1164}