1pub(crate) mod bmp;
12mod config;
13mod dense;
14#[cfg(feature = "diagnostics")]
15mod diagnostics;
16pub(crate) mod graph_bisection;
17mod postings;
18mod sparse;
19mod store;
20
21pub use config::{MemoryBreakdown, SegmentBuilderConfig, SegmentBuilderStats};
22
23use std::fs::{File, OpenOptions};
24use std::io::{BufWriter, Write};
25use std::mem::size_of;
26use std::path::PathBuf;
27
28use hashbrown::HashMap;
29use lasso::{Rodeo, Spur};
30use rustc_hash::FxHashMap;
31
32use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
33use std::sync::Arc;
34
35use crate::directories::{Directory, DirectoryWriter};
36use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
37use crate::tokenizer::BoxedTokenizer;
38use crate::{DocId, Result};
39
40use dense::{BinaryDenseVectorBuilder, DenseVectorBuilder};
41use postings::{CompactPosting, PositionPostingListBuilder, PostingListBuilder, TermKey};
42use sparse::SparseVectorBuilder;
43
44const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; const NEW_TERM_OVERHEAD: usize = size_of::<TermKey>() + size_of::<PostingListBuilder>() + 24;
50
51const INTERN_OVERHEAD: usize = size_of::<Spur>() + 2 * size_of::<usize>();
53
54const NEW_POS_TERM_OVERHEAD: usize =
56 size_of::<TermKey>() + size_of::<PositionPostingListBuilder>() + 24;
57
58pub struct SegmentBuilder {
65 schema: Arc<Schema>,
66 config: SegmentBuilderConfig,
67 tokenizers: FxHashMap<Field, BoxedTokenizer>,
68
69 term_interner: Rodeo,
71
72 inverted_index: HashMap<TermKey, PostingListBuilder>,
74
75 store_file: BufWriter<File>,
77 store_path: PathBuf,
78
79 next_doc_id: DocId,
81
82 field_stats: FxHashMap<u32, FieldStats>,
84
85 doc_field_lengths: Vec<u32>,
89 num_indexed_fields: usize,
90 field_to_slot: FxHashMap<u32, usize>,
91
92 local_tf_buffer: FxHashMap<Spur, u32>,
95
96 local_positions: FxHashMap<Spur, Vec<u32>>,
99
100 token_buffer: String,
102
103 numeric_buffer: String,
105
106 dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
109
110 binary_dense_vectors: FxHashMap<u32, BinaryDenseVectorBuilder>,
112
113 sparse_vectors: FxHashMap<u32, SparseVectorBuilder>,
116
117 position_index: HashMap<TermKey, PositionPostingListBuilder>,
120
121 position_enabled_fields: FxHashMap<u32, Option<crate::dsl::PositionMode>>,
123
124 current_element_ordinal: FxHashMap<u32, u32>,
126
127 estimated_memory: usize,
129
130 doc_serialize_buffer: Vec<u8>,
132
133 fast_fields: FxHashMap<u32, crate::structures::fast_field::FastFieldWriter>,
135}
136
137impl SegmentBuilder {
138 pub fn new(schema: Arc<Schema>, config: SegmentBuilderConfig) -> Result<Self> {
140 let segment_id = uuid::Uuid::new_v4();
141 let store_path = config
142 .temp_dir
143 .join(format!("hermes_store_{}.tmp", segment_id));
144
145 let store_file = BufWriter::with_capacity(
146 STORE_BUFFER_SIZE,
147 OpenOptions::new()
148 .create(true)
149 .write(true)
150 .truncate(true)
151 .open(&store_path)?,
152 );
153
154 let registry = crate::tokenizer::TokenizerRegistry::new();
156 let mut num_indexed_fields = 0;
157 let mut field_to_slot = FxHashMap::default();
158 let mut position_enabled_fields = FxHashMap::default();
159 let mut tokenizers = FxHashMap::default();
160 for (field, entry) in schema.fields() {
161 if entry.indexed && matches!(entry.field_type, FieldType::Text) {
162 field_to_slot.insert(field.0, num_indexed_fields);
163 num_indexed_fields += 1;
164 if entry.positions.is_some() {
165 position_enabled_fields.insert(field.0, entry.positions);
166 }
167 if let Some(ref tok_name) = entry.tokenizer
168 && let Some(tokenizer) = registry.get(tok_name)
169 {
170 tokenizers.insert(field, tokenizer);
171 }
172 }
173 }
174
175 use crate::structures::fast_field::{FastFieldColumnType, FastFieldWriter};
177 let mut fast_fields = FxHashMap::default();
178 for (field, entry) in schema.fields() {
179 if entry.fast {
180 let writer = if entry.multi {
181 match entry.field_type {
182 FieldType::U64 => {
183 FastFieldWriter::new_numeric_multi(FastFieldColumnType::U64)
184 }
185 FieldType::I64 => {
186 FastFieldWriter::new_numeric_multi(FastFieldColumnType::I64)
187 }
188 FieldType::F64 => {
189 FastFieldWriter::new_numeric_multi(FastFieldColumnType::F64)
190 }
191 FieldType::Text => FastFieldWriter::new_text_multi(),
192 _ => continue,
193 }
194 } else {
195 match entry.field_type {
196 FieldType::U64 => FastFieldWriter::new_numeric(FastFieldColumnType::U64),
197 FieldType::I64 => FastFieldWriter::new_numeric(FastFieldColumnType::I64),
198 FieldType::F64 => FastFieldWriter::new_numeric(FastFieldColumnType::F64),
199 FieldType::Text => FastFieldWriter::new_text(),
200 _ => continue,
201 }
202 };
203 fast_fields.insert(field.0, writer);
204 }
205 }
206
207 Ok(Self {
208 schema,
209 tokenizers,
210 term_interner: Rodeo::new(),
211 inverted_index: HashMap::with_capacity(config.posting_map_capacity),
212 store_file,
213 store_path,
214 next_doc_id: 0,
215 field_stats: FxHashMap::default(),
216 doc_field_lengths: Vec::new(),
217 num_indexed_fields,
218 field_to_slot,
219 local_tf_buffer: FxHashMap::default(),
220 local_positions: FxHashMap::default(),
221 token_buffer: String::with_capacity(64),
222 numeric_buffer: String::with_capacity(32),
223 config,
224 dense_vectors: FxHashMap::default(),
225 binary_dense_vectors: FxHashMap::default(),
226 sparse_vectors: FxHashMap::default(),
227 position_index: HashMap::new(),
228 position_enabled_fields,
229 current_element_ordinal: FxHashMap::default(),
230 estimated_memory: 0,
231 doc_serialize_buffer: Vec::with_capacity(256),
232 fast_fields,
233 })
234 }
235
236 pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
237 self.tokenizers.insert(field, tokenizer);
238 }
239
240 fn next_element_ordinal(&mut self, field_id: u32) -> u32 {
243 let ordinal = *self.current_element_ordinal.get(&field_id).unwrap_or(&0);
244 *self.current_element_ordinal.entry(field_id).or_insert(0) += 1;
245 ordinal
246 }
247
248 pub fn num_docs(&self) -> u32 {
249 self.next_doc_id
250 }
251
252 #[inline]
254 pub fn estimated_memory_bytes(&self) -> usize {
255 self.estimated_memory
256 }
257
258 pub fn sparse_dim_count(&self) -> usize {
260 self.sparse_vectors.values().map(|b| b.postings.len()).sum()
261 }
262
263 pub fn stats(&self) -> SegmentBuilderStats {
265 use std::mem::size_of;
266
267 let postings_in_memory: usize =
268 self.inverted_index.values().map(|p| p.postings.len()).sum();
269
270 let compact_posting_size = size_of::<CompactPosting>();
272 let vec_overhead = size_of::<Vec<u8>>(); let term_key_size = size_of::<TermKey>();
274 let posting_builder_size = size_of::<PostingListBuilder>();
275 let spur_size = size_of::<lasso::Spur>();
276 let sparse_entry_size = size_of::<(DocId, u16, f32)>();
277
278 let hashmap_entry_base_overhead = 8usize;
281
282 let fxhashmap_entry_overhead = hashmap_entry_base_overhead;
284
285 let postings_bytes: usize = self
287 .inverted_index
288 .values()
289 .map(|p| p.postings.capacity() * compact_posting_size + vec_overhead)
290 .sum();
291
292 let index_overhead_bytes = self.inverted_index.len()
294 * (term_key_size + posting_builder_size + hashmap_entry_base_overhead);
295
296 let interner_arena_overhead = 2 * size_of::<usize>();
299 let avg_term_len = 8; let interner_bytes =
301 self.term_interner.len() * (avg_term_len + spur_size + interner_arena_overhead);
302
303 let field_lengths_bytes =
305 self.doc_field_lengths.capacity() * size_of::<u32>() + vec_overhead;
306
307 let mut dense_vectors_bytes: usize = 0;
309 let mut dense_vector_count: usize = 0;
310 let doc_id_ordinal_size = size_of::<(DocId, u16)>();
311 for b in self.dense_vectors.values() {
312 dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
313 + b.doc_ids.capacity() * doc_id_ordinal_size
314 + 2 * vec_overhead; dense_vector_count += b.doc_ids.len();
316 }
317 for b in self.binary_dense_vectors.values() {
319 dense_vectors_bytes += b.vectors.capacity()
320 + b.doc_ids.capacity() * doc_id_ordinal_size
321 + 2 * vec_overhead;
322 dense_vector_count += b.doc_ids.len();
323 }
324
325 let local_tf_entry_size = spur_size + size_of::<u32>() + fxhashmap_entry_overhead;
327 let local_tf_buffer_bytes = self.local_tf_buffer.capacity() * local_tf_entry_size;
328
329 let mut sparse_vectors_bytes: usize = 0;
331 for builder in self.sparse_vectors.values() {
332 for postings in builder.postings.values() {
333 sparse_vectors_bytes += postings.capacity() * sparse_entry_size + vec_overhead;
334 }
335 let inner_entry_size = size_of::<u32>() + vec_overhead + fxhashmap_entry_overhead;
337 sparse_vectors_bytes += builder.postings.len() * inner_entry_size;
338 }
339 let outer_sparse_entry_size =
341 size_of::<u32>() + size_of::<SparseVectorBuilder>() + fxhashmap_entry_overhead;
342 sparse_vectors_bytes += self.sparse_vectors.len() * outer_sparse_entry_size;
343
344 let mut position_index_bytes: usize = 0;
346 for pos_builder in self.position_index.values() {
347 for (_, positions) in &pos_builder.postings {
348 position_index_bytes += positions.capacity() * size_of::<u32>() + vec_overhead;
349 }
350 let pos_entry_size = size_of::<DocId>() + vec_overhead;
352 position_index_bytes += pos_builder.postings.capacity() * pos_entry_size;
353 }
354 let pos_index_entry_size =
356 term_key_size + size_of::<PositionPostingListBuilder>() + hashmap_entry_base_overhead;
357 position_index_bytes += self.position_index.len() * pos_index_entry_size;
358
359 let estimated_memory_bytes = postings_bytes
360 + index_overhead_bytes
361 + interner_bytes
362 + field_lengths_bytes
363 + dense_vectors_bytes
364 + local_tf_buffer_bytes
365 + sparse_vectors_bytes
366 + position_index_bytes;
367
368 let memory_breakdown = MemoryBreakdown {
369 postings_bytes,
370 index_overhead_bytes,
371 interner_bytes,
372 field_lengths_bytes,
373 dense_vectors_bytes,
374 dense_vector_count,
375 sparse_vectors_bytes,
376 position_index_bytes,
377 };
378
379 SegmentBuilderStats {
380 num_docs: self.next_doc_id,
381 unique_terms: self.inverted_index.len(),
382 postings_in_memory,
383 interned_strings: self.term_interner.len(),
384 doc_field_lengths_size: self.doc_field_lengths.len(),
385 estimated_memory_bytes,
386 memory_breakdown,
387 }
388 }
389
390 pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
392 let doc_id = self.next_doc_id;
393 self.next_doc_id += 1;
394
395 let base_idx = self.doc_field_lengths.len();
397 self.doc_field_lengths
398 .resize(base_idx + self.num_indexed_fields, 0);
399 self.estimated_memory += self.num_indexed_fields * std::mem::size_of::<u32>();
400
401 self.current_element_ordinal.clear();
403
404 for (field, value) in doc.field_values() {
405 let Some(entry) = self.schema.get_field_entry(*field) else {
406 continue;
407 };
408
409 if !matches!(
412 &entry.field_type,
413 FieldType::DenseVector | FieldType::BinaryDenseVector
414 ) && !entry.indexed
415 && !entry.fast
416 {
417 continue;
418 }
419
420 match (&entry.field_type, value) {
421 (FieldType::Text, FieldValue::Text(text)) => {
422 if entry.indexed {
423 let element_ordinal = self.next_element_ordinal(field.0);
424 let token_count =
425 self.index_text_field(*field, doc_id, text, element_ordinal)?;
426
427 let stats = self.field_stats.entry(field.0).or_default();
428 stats.total_tokens += token_count as u64;
429 if element_ordinal == 0 {
430 stats.doc_count += 1;
431 }
432
433 if let Some(&slot) = self.field_to_slot.get(&field.0) {
434 self.doc_field_lengths[base_idx + slot] = token_count;
435 }
436 }
437
438 if let Some(ff) = self.fast_fields.get_mut(&field.0) {
440 ff.add_text(doc_id, text);
441 }
442 }
443 (FieldType::U64, FieldValue::U64(v)) => {
444 if entry.indexed {
445 self.index_numeric_field(*field, doc_id, *v)?;
446 }
447 if let Some(ff) = self.fast_fields.get_mut(&field.0) {
448 ff.add_u64(doc_id, *v);
449 }
450 }
451 (FieldType::I64, FieldValue::I64(v)) => {
452 if entry.indexed {
453 self.index_numeric_field(*field, doc_id, *v as u64)?;
454 }
455 if let Some(ff) = self.fast_fields.get_mut(&field.0) {
456 ff.add_i64(doc_id, *v);
457 }
458 }
459 (FieldType::F64, FieldValue::F64(v)) => {
460 if entry.indexed {
461 self.index_numeric_field(*field, doc_id, v.to_bits())?;
462 }
463 if let Some(ff) = self.fast_fields.get_mut(&field.0) {
464 ff.add_f64(doc_id, *v);
465 }
466 }
467 (FieldType::DenseVector, FieldValue::DenseVector(vec))
468 if entry.indexed || entry.stored =>
469 {
470 let ordinal = self.next_element_ordinal(field.0);
471 self.index_dense_vector_field(*field, doc_id, ordinal as u16, vec)?;
472 }
473 (FieldType::BinaryDenseVector, FieldValue::BinaryDenseVector(bytes))
474 if entry.indexed || entry.stored =>
475 {
476 let ordinal = self.next_element_ordinal(field.0);
477 self.index_binary_dense_vector_field(*field, doc_id, ordinal as u16, bytes)?;
478 }
479 (FieldType::SparseVector, FieldValue::SparseVector(entries)) => {
480 let ordinal = self.next_element_ordinal(field.0);
481 self.index_sparse_vector_field(*field, doc_id, ordinal as u16, entries)?;
482 }
483 _ => {}
484 }
485 }
486
487 self.write_document_to_store(&doc)?;
489
490 Ok(doc_id)
491 }
492
493 fn index_text_field(
502 &mut self,
503 field: Field,
504 doc_id: DocId,
505 text: &str,
506 element_ordinal: u32,
507 ) -> Result<u32> {
508 use crate::dsl::PositionMode;
509
510 let field_id = field.0;
511 let position_mode = self
512 .position_enabled_fields
513 .get(&field_id)
514 .copied()
515 .flatten();
516
517 self.local_tf_buffer.clear();
521 for v in self.local_positions.values_mut() {
523 v.clear();
524 }
525
526 let mut token_position = 0u32;
527
528 let custom_tokens = self.tokenizers.get(&field).map(|t| t.tokenize(text));
532
533 if let Some(tokens) = custom_tokens {
534 for token in &tokens {
536 let term_spur = if let Some(spur) = self.term_interner.get(&token.text) {
537 spur
538 } else {
539 let spur = self.term_interner.get_or_intern(&token.text);
540 self.estimated_memory += token.text.len() + INTERN_OVERHEAD;
541 spur
542 };
543 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
544
545 if let Some(mode) = position_mode {
546 let encoded_pos = match mode {
547 PositionMode::Ordinal => element_ordinal << 20,
548 PositionMode::TokenPosition => token.position,
549 PositionMode::Full => (element_ordinal << 20) | token.position,
550 };
551 self.local_positions
552 .entry(term_spur)
553 .or_default()
554 .push(encoded_pos);
555 }
556 }
557 token_position = tokens.len() as u32;
558 } else {
559 for word in text.split_whitespace() {
561 self.token_buffer.clear();
562 for c in word.chars() {
563 if c.is_alphanumeric() {
564 for lc in c.to_lowercase() {
565 self.token_buffer.push(lc);
566 }
567 }
568 }
569
570 if self.token_buffer.is_empty() {
571 continue;
572 }
573
574 let term_spur = if let Some(spur) = self.term_interner.get(&self.token_buffer) {
575 spur
576 } else {
577 let spur = self.term_interner.get_or_intern(&self.token_buffer);
578 self.estimated_memory += self.token_buffer.len() + INTERN_OVERHEAD;
579 spur
580 };
581 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
582
583 if let Some(mode) = position_mode {
584 let encoded_pos = match mode {
585 PositionMode::Ordinal => element_ordinal << 20,
586 PositionMode::TokenPosition => token_position,
587 PositionMode::Full => (element_ordinal << 20) | token_position,
588 };
589 self.local_positions
590 .entry(term_spur)
591 .or_default()
592 .push(encoded_pos);
593 }
594
595 token_position += 1;
596 }
597 }
598
599 for (&term_spur, &tf) in &self.local_tf_buffer {
602 let term_key = TermKey {
603 field: field_id,
604 term: term_spur,
605 };
606
607 match self.inverted_index.entry(term_key) {
608 hashbrown::hash_map::Entry::Occupied(mut o) => {
609 o.get_mut().add(doc_id, tf);
610 self.estimated_memory += size_of::<CompactPosting>();
611 }
612 hashbrown::hash_map::Entry::Vacant(v) => {
613 let mut posting = PostingListBuilder::new();
614 posting.add(doc_id, tf);
615 v.insert(posting);
616 self.estimated_memory += size_of::<CompactPosting>() + NEW_TERM_OVERHEAD;
617 }
618 }
619
620 if position_mode.is_some()
621 && let Some(positions) = self.local_positions.get(&term_spur)
622 {
623 match self.position_index.entry(term_key) {
624 hashbrown::hash_map::Entry::Occupied(mut o) => {
625 for &pos in positions {
626 o.get_mut().add_position(doc_id, pos);
627 }
628 self.estimated_memory += positions.len() * size_of::<u32>();
629 }
630 hashbrown::hash_map::Entry::Vacant(v) => {
631 let mut pos_posting = PositionPostingListBuilder::new();
632 for &pos in positions {
633 pos_posting.add_position(doc_id, pos);
634 }
635 self.estimated_memory +=
636 positions.len() * size_of::<u32>() + NEW_POS_TERM_OVERHEAD;
637 v.insert(pos_posting);
638 }
639 }
640 }
641 }
642
643 Ok(token_position)
644 }
645
646 fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
647 use std::fmt::Write;
648
649 self.numeric_buffer.clear();
650 write!(self.numeric_buffer, "__num_{}", value).unwrap();
651 let term_spur = if let Some(spur) = self.term_interner.get(&self.numeric_buffer) {
652 spur
653 } else {
654 let spur = self.term_interner.get_or_intern(&self.numeric_buffer);
655 self.estimated_memory += self.numeric_buffer.len() + INTERN_OVERHEAD;
656 spur
657 };
658
659 let term_key = TermKey {
660 field: field.0,
661 term: term_spur,
662 };
663
664 match self.inverted_index.entry(term_key) {
665 hashbrown::hash_map::Entry::Occupied(mut o) => {
666 o.get_mut().add(doc_id, 1);
667 self.estimated_memory += size_of::<CompactPosting>();
668 }
669 hashbrown::hash_map::Entry::Vacant(v) => {
670 let mut posting = PostingListBuilder::new();
671 posting.add(doc_id, 1);
672 v.insert(posting);
673 self.estimated_memory += size_of::<CompactPosting>() + NEW_TERM_OVERHEAD;
674 }
675 }
676
677 Ok(())
678 }
679
680 fn index_dense_vector_field(
682 &mut self,
683 field: Field,
684 doc_id: DocId,
685 ordinal: u16,
686 vector: &[f32],
687 ) -> Result<()> {
688 let dim = vector.len();
689
690 let builder = self
691 .dense_vectors
692 .entry(field.0)
693 .or_insert_with(|| DenseVectorBuilder::new(dim));
694
695 if builder.dim != dim && builder.len() > 0 {
697 return Err(crate::Error::Schema(format!(
698 "Dense vector dimension mismatch: expected {}, got {}",
699 builder.dim, dim
700 )));
701 }
702
703 builder.add(doc_id, ordinal, vector);
704
705 self.estimated_memory += std::mem::size_of_val(vector) + size_of::<(DocId, u16)>();
706
707 Ok(())
708 }
709
710 fn index_binary_dense_vector_field(
712 &mut self,
713 field: Field,
714 doc_id: DocId,
715 ordinal: u16,
716 bytes: &[u8],
717 ) -> Result<()> {
718 let dim_bits = self
719 .schema
720 .get_field_entry(field)
721 .and_then(|e| e.binary_dense_vector_config.as_ref())
722 .map(|c| c.dim)
723 .ok_or_else(|| {
724 crate::Error::Schema("BinaryDenseVector field missing config".to_string())
725 })?;
726
727 let expected_byte_len = dim_bits.div_ceil(8);
728 if bytes.len() != expected_byte_len {
729 return Err(crate::Error::Schema(format!(
730 "Binary vector byte length mismatch: expected {} (dim={}), got {}",
731 expected_byte_len,
732 dim_bits,
733 bytes.len()
734 )));
735 }
736
737 let builder = self
738 .binary_dense_vectors
739 .entry(field.0)
740 .or_insert_with(|| BinaryDenseVectorBuilder::new(dim_bits));
741
742 builder.add(doc_id, ordinal, bytes);
743 self.estimated_memory += bytes.len() + size_of::<(DocId, u16)>();
744
745 Ok(())
746 }
747
748 fn index_sparse_vector_field(
755 &mut self,
756 field: Field,
757 doc_id: DocId,
758 ordinal: u16,
759 entries: &[(u32, f32)],
760 ) -> Result<()> {
761 let weight_threshold = self
763 .schema
764 .get_field_entry(field)
765 .and_then(|entry| entry.sparse_vector_config.as_ref())
766 .map(|config| config.weight_threshold)
767 .unwrap_or(0.0);
768
769 let builder = self
770 .sparse_vectors
771 .entry(field.0)
772 .or_insert_with(SparseVectorBuilder::new);
773
774 builder.inc_vector_count();
775
776 for &(dim_id, weight) in entries {
777 if weight.abs() < weight_threshold {
779 continue;
780 }
781
782 let is_new_dim = !builder.postings.contains_key(&dim_id);
783 builder.add(dim_id, doc_id, ordinal, weight);
784 self.estimated_memory += size_of::<(DocId, u16, f32)>();
785 if is_new_dim {
786 self.estimated_memory += size_of::<u32>() + size_of::<Vec<(DocId, u16, f32)>>() + 8; }
789 }
790
791 Ok(())
792 }
793
794 fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
796 use byteorder::{LittleEndian, WriteBytesExt};
797
798 super::store::serialize_document_into(doc, &self.schema, &mut self.doc_serialize_buffer)?;
799
800 self.store_file
801 .write_u32::<LittleEndian>(self.doc_serialize_buffer.len() as u32)?;
802 self.store_file.write_all(&self.doc_serialize_buffer)?;
803
804 Ok(())
805 }
806
807 pub async fn build<D: Directory + DirectoryWriter>(
813 mut self,
814 dir: &D,
815 segment_id: SegmentId,
816 trained: Option<&super::TrainedVectorStructures>,
817 ) -> Result<SegmentMeta> {
818 self.store_file.flush()?;
820
821 let files = SegmentFiles::new(segment_id.0);
822
823 let position_index = std::mem::take(&mut self.position_index);
825 let position_offsets = if !position_index.is_empty() {
826 let mut pos_writer = dir.streaming_writer(&files.positions).await?;
827 let offsets = postings::build_positions_streaming(
828 position_index,
829 &self.term_interner,
830 &mut *pos_writer,
831 )?;
832 pos_writer.finish()?;
833 offsets
834 } else {
835 FxHashMap::default()
836 };
837
838 let inverted_index = std::mem::take(&mut self.inverted_index);
841 let term_interner = std::mem::replace(&mut self.term_interner, Rodeo::new());
842 let store_path = self.store_path.clone();
843 let num_compression_threads = self.config.num_compression_threads;
844 let compression_level = self.config.compression_level;
845 let dense_vectors = std::mem::take(&mut self.dense_vectors);
846 let binary_dense_vectors = std::mem::take(&mut self.binary_dense_vectors);
847 let mut sparse_vectors = std::mem::take(&mut self.sparse_vectors);
848 let schema = &self.schema;
849
850 let mut term_dict_writer =
853 super::OffsetWriter::new(dir.streaming_writer(&files.term_dict).await?);
854 let mut postings_writer =
855 super::OffsetWriter::new(dir.streaming_writer(&files.postings).await?);
856 let mut store_writer = super::OffsetWriter::new(dir.streaming_writer(&files.store).await?);
857 let mut vectors_writer = if !dense_vectors.is_empty() || !binary_dense_vectors.is_empty() {
858 Some(super::OffsetWriter::new(
859 dir.streaming_writer(&files.vectors).await?,
860 ))
861 } else {
862 None
863 };
864 let mut sparse_writer = if !sparse_vectors.is_empty() {
865 Some(super::OffsetWriter::new(
866 dir.streaming_writer(&files.sparse).await?,
867 ))
868 } else {
869 None
870 };
871 let mut fast_fields = std::mem::take(&mut self.fast_fields);
872 let num_docs = self.next_doc_id;
873 let mut fast_writer = if !fast_fields.is_empty() {
874 Some(super::OffsetWriter::new(
875 dir.streaming_writer(&files.fast).await?,
876 ))
877 } else {
878 None
879 };
880
881 let ((postings_result, store_result), ((vectors_result, sparse_result), fast_result)) =
882 rayon::join(
883 || {
884 rayon::join(
885 || {
886 postings::build_postings_streaming(
887 inverted_index,
888 term_interner,
889 &position_offsets,
890 &mut term_dict_writer,
891 &mut postings_writer,
892 )
893 },
894 || {
895 store::build_store_streaming(
896 &store_path,
897 num_compression_threads,
898 compression_level,
899 &mut store_writer,
900 num_docs,
901 )
902 },
903 )
904 },
905 || {
906 rayon::join(
907 || {
908 rayon::join(
909 || -> Result<()> {
910 if let Some(ref mut w) = vectors_writer {
911 dense::build_vectors_streaming(
912 dense_vectors,
913 binary_dense_vectors,
914 schema,
915 trained,
916 w,
917 )?;
918 }
919 Ok(())
920 },
921 || -> Result<()> {
922 if let Some(ref mut w) = sparse_writer {
923 sparse::build_sparse_streaming(
924 &mut sparse_vectors,
925 schema,
926 w,
927 )?;
928 }
929 Ok(())
930 },
931 )
932 },
933 || -> Result<()> {
934 if let Some(ref mut w) = fast_writer {
935 build_fast_fields_streaming(&mut fast_fields, num_docs, w)?;
936 }
937 Ok(())
938 },
939 )
940 },
941 );
942 postings_result?;
943 store_result?;
944 vectors_result?;
945 sparse_result?;
946 fast_result?;
947
948 let term_dict_bytes = term_dict_writer.offset() as usize;
949 let postings_bytes = postings_writer.offset() as usize;
950 let store_bytes = store_writer.offset() as usize;
951 let vectors_bytes = vectors_writer.as_ref().map_or(0, |w| w.offset() as usize);
952 let sparse_bytes = sparse_writer.as_ref().map_or(0, |w| w.offset() as usize);
953 let fast_bytes = fast_writer.as_ref().map_or(0, |w| w.offset() as usize);
954
955 term_dict_writer.finish()?;
956 postings_writer.finish()?;
957 store_writer.finish()?;
958 if let Some(w) = vectors_writer {
959 w.finish()?;
960 }
961 if let Some(w) = sparse_writer {
962 w.finish()?;
963 }
964 if let Some(w) = fast_writer {
965 w.finish()?;
966 }
967 drop(position_offsets);
968 drop(sparse_vectors);
969
970 log::info!(
971 "[segment_build] {} docs: term_dict={}, postings={}, store={}, vectors={}, sparse={}, fast={}",
972 num_docs,
973 super::format_bytes(term_dict_bytes),
974 super::format_bytes(postings_bytes),
975 super::format_bytes(store_bytes),
976 super::format_bytes(vectors_bytes),
977 super::format_bytes(sparse_bytes),
978 super::format_bytes(fast_bytes),
979 );
980
981 let meta = SegmentMeta {
982 id: segment_id.0,
983 num_docs: self.next_doc_id,
984 field_stats: self.field_stats.clone(),
985 };
986
987 dir.write(&files.meta, &meta.serialize()?).await?;
988
989 let _ = std::fs::remove_file(&self.store_path);
991
992 Ok(meta)
993 }
994}
995
996fn build_fast_fields_streaming(
998 fast_fields: &mut FxHashMap<u32, crate::structures::fast_field::FastFieldWriter>,
999 num_docs: u32,
1000 writer: &mut dyn Write,
1001) -> Result<()> {
1002 use crate::structures::fast_field::{FastFieldTocEntry, write_fast_field_toc_and_footer};
1003
1004 if fast_fields.is_empty() {
1005 return Ok(());
1006 }
1007
1008 let mut field_ids: Vec<u32> = fast_fields.keys().copied().collect();
1010 field_ids.sort_unstable();
1011
1012 let mut toc_entries: Vec<FastFieldTocEntry> = Vec::with_capacity(field_ids.len());
1013 let mut current_offset = 0u64;
1014
1015 for &field_id in &field_ids {
1016 let ff = fast_fields.get_mut(&field_id).unwrap();
1017 ff.pad_to(num_docs);
1018
1019 let (mut toc, bytes_written) = ff.serialize(writer, current_offset)?;
1020 toc.field_id = field_id;
1021 current_offset += bytes_written;
1022 toc_entries.push(toc);
1023 }
1024
1025 let toc_offset = current_offset;
1027 write_fast_field_toc_and_footer(writer, toc_offset, &toc_entries)?;
1028
1029 Ok(())
1030}
1031
1032impl Drop for SegmentBuilder {
1033 fn drop(&mut self) {
1034 let _ = std::fs::remove_file(&self.store_path);
1036 }
1037}