1mod config;
12mod dense;
13#[cfg(feature = "diagnostics")]
14mod diagnostics;
15mod postings;
16mod sparse;
17mod store;
18
19pub use config::{MemoryBreakdown, SegmentBuilderConfig, SegmentBuilderStats};
20
21use std::fs::{File, OpenOptions};
22use std::io::{BufWriter, Write};
23use std::mem::size_of;
24use std::path::PathBuf;
25
26use hashbrown::HashMap;
27use lasso::{Rodeo, Spur};
28use rustc_hash::FxHashMap;
29
30use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
31use std::sync::Arc;
32
33use crate::directories::{Directory, DirectoryWriter};
34use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
35use crate::tokenizer::BoxedTokenizer;
36use crate::{DocId, Result};
37
38use dense::DenseVectorBuilder;
39use postings::{CompactPosting, PositionPostingListBuilder, PostingListBuilder, TermKey};
40use sparse::SparseVectorBuilder;
41
42const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; const NEW_TERM_OVERHEAD: usize = size_of::<TermKey>() + size_of::<PostingListBuilder>() + 24;
48
49const INTERN_OVERHEAD: usize = size_of::<Spur>() + 2 * size_of::<usize>();
51
52const NEW_POS_TERM_OVERHEAD: usize =
54 size_of::<TermKey>() + size_of::<PositionPostingListBuilder>() + 24;
55
56pub struct SegmentBuilder {
63 schema: Arc<Schema>,
64 config: SegmentBuilderConfig,
65 tokenizers: FxHashMap<Field, BoxedTokenizer>,
66
67 term_interner: Rodeo,
69
70 inverted_index: HashMap<TermKey, PostingListBuilder>,
72
73 store_file: BufWriter<File>,
75 store_path: PathBuf,
76
77 next_doc_id: DocId,
79
80 field_stats: FxHashMap<u32, FieldStats>,
82
83 doc_field_lengths: Vec<u32>,
87 num_indexed_fields: usize,
88 field_to_slot: FxHashMap<u32, usize>,
89
90 local_tf_buffer: FxHashMap<Spur, u32>,
93
94 local_positions: FxHashMap<Spur, Vec<u32>>,
97
98 token_buffer: String,
100
101 numeric_buffer: String,
103
104 dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
107
108 sparse_vectors: FxHashMap<u32, SparseVectorBuilder>,
111
112 position_index: HashMap<TermKey, PositionPostingListBuilder>,
115
116 position_enabled_fields: FxHashMap<u32, Option<crate::dsl::PositionMode>>,
118
119 current_element_ordinal: FxHashMap<u32, u32>,
121
122 estimated_memory: usize,
124
125 doc_serialize_buffer: Vec<u8>,
127
128 fast_fields: FxHashMap<u32, crate::structures::fast_field::FastFieldWriter>,
130}
131
132impl SegmentBuilder {
133 pub fn new(schema: Arc<Schema>, config: SegmentBuilderConfig) -> Result<Self> {
135 let segment_id = uuid::Uuid::new_v4();
136 let store_path = config
137 .temp_dir
138 .join(format!("hermes_store_{}.tmp", segment_id));
139
140 let store_file = BufWriter::with_capacity(
141 STORE_BUFFER_SIZE,
142 OpenOptions::new()
143 .create(true)
144 .write(true)
145 .truncate(true)
146 .open(&store_path)?,
147 );
148
149 let registry = crate::tokenizer::TokenizerRegistry::new();
151 let mut num_indexed_fields = 0;
152 let mut field_to_slot = FxHashMap::default();
153 let mut position_enabled_fields = FxHashMap::default();
154 let mut tokenizers = FxHashMap::default();
155 for (field, entry) in schema.fields() {
156 if entry.indexed && matches!(entry.field_type, FieldType::Text) {
157 field_to_slot.insert(field.0, num_indexed_fields);
158 num_indexed_fields += 1;
159 if entry.positions.is_some() {
160 position_enabled_fields.insert(field.0, entry.positions);
161 }
162 if let Some(ref tok_name) = entry.tokenizer
163 && let Some(tokenizer) = registry.get(tok_name)
164 {
165 tokenizers.insert(field, tokenizer);
166 }
167 }
168 }
169
170 use crate::structures::fast_field::{FastFieldColumnType, FastFieldWriter};
172 let mut fast_fields = FxHashMap::default();
173 for (field, entry) in schema.fields() {
174 if entry.fast {
175 let writer = if entry.multi {
176 match entry.field_type {
177 FieldType::U64 => {
178 FastFieldWriter::new_numeric_multi(FastFieldColumnType::U64)
179 }
180 FieldType::I64 => {
181 FastFieldWriter::new_numeric_multi(FastFieldColumnType::I64)
182 }
183 FieldType::F64 => {
184 FastFieldWriter::new_numeric_multi(FastFieldColumnType::F64)
185 }
186 FieldType::Text => FastFieldWriter::new_text_multi(),
187 _ => continue,
188 }
189 } else {
190 match entry.field_type {
191 FieldType::U64 => FastFieldWriter::new_numeric(FastFieldColumnType::U64),
192 FieldType::I64 => FastFieldWriter::new_numeric(FastFieldColumnType::I64),
193 FieldType::F64 => FastFieldWriter::new_numeric(FastFieldColumnType::F64),
194 FieldType::Text => FastFieldWriter::new_text(),
195 _ => continue,
196 }
197 };
198 fast_fields.insert(field.0, writer);
199 }
200 }
201
202 Ok(Self {
203 schema,
204 tokenizers,
205 term_interner: Rodeo::new(),
206 inverted_index: HashMap::with_capacity(config.posting_map_capacity),
207 store_file,
208 store_path,
209 next_doc_id: 0,
210 field_stats: FxHashMap::default(),
211 doc_field_lengths: Vec::new(),
212 num_indexed_fields,
213 field_to_slot,
214 local_tf_buffer: FxHashMap::default(),
215 local_positions: FxHashMap::default(),
216 token_buffer: String::with_capacity(64),
217 numeric_buffer: String::with_capacity(32),
218 config,
219 dense_vectors: FxHashMap::default(),
220 sparse_vectors: FxHashMap::default(),
221 position_index: HashMap::new(),
222 position_enabled_fields,
223 current_element_ordinal: FxHashMap::default(),
224 estimated_memory: 0,
225 doc_serialize_buffer: Vec::with_capacity(256),
226 fast_fields,
227 })
228 }
229
230 pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
231 self.tokenizers.insert(field, tokenizer);
232 }
233
234 fn next_element_ordinal(&mut self, field_id: u32) -> u32 {
237 let ordinal = *self.current_element_ordinal.get(&field_id).unwrap_or(&0);
238 *self.current_element_ordinal.entry(field_id).or_insert(0) += 1;
239 ordinal
240 }
241
242 pub fn num_docs(&self) -> u32 {
243 self.next_doc_id
244 }
245
246 #[inline]
248 pub fn estimated_memory_bytes(&self) -> usize {
249 self.estimated_memory
250 }
251
252 pub fn sparse_dim_count(&self) -> usize {
254 self.sparse_vectors.values().map(|b| b.postings.len()).sum()
255 }
256
257 pub fn stats(&self) -> SegmentBuilderStats {
259 use std::mem::size_of;
260
261 let postings_in_memory: usize =
262 self.inverted_index.values().map(|p| p.postings.len()).sum();
263
264 let compact_posting_size = size_of::<CompactPosting>();
266 let vec_overhead = size_of::<Vec<u8>>(); let term_key_size = size_of::<TermKey>();
268 let posting_builder_size = size_of::<PostingListBuilder>();
269 let spur_size = size_of::<lasso::Spur>();
270 let sparse_entry_size = size_of::<(DocId, u16, f32)>();
271
272 let hashmap_entry_base_overhead = 8usize;
275
276 let fxhashmap_entry_overhead = hashmap_entry_base_overhead;
278
279 let postings_bytes: usize = self
281 .inverted_index
282 .values()
283 .map(|p| p.postings.capacity() * compact_posting_size + vec_overhead)
284 .sum();
285
286 let index_overhead_bytes = self.inverted_index.len()
288 * (term_key_size + posting_builder_size + hashmap_entry_base_overhead);
289
290 let interner_arena_overhead = 2 * size_of::<usize>();
293 let avg_term_len = 8; let interner_bytes =
295 self.term_interner.len() * (avg_term_len + spur_size + interner_arena_overhead);
296
297 let field_lengths_bytes =
299 self.doc_field_lengths.capacity() * size_of::<u32>() + vec_overhead;
300
301 let mut dense_vectors_bytes: usize = 0;
303 let mut dense_vector_count: usize = 0;
304 let doc_id_ordinal_size = size_of::<(DocId, u16)>();
305 for b in self.dense_vectors.values() {
306 dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
307 + b.doc_ids.capacity() * doc_id_ordinal_size
308 + 2 * vec_overhead; dense_vector_count += b.doc_ids.len();
310 }
311
312 let local_tf_entry_size = spur_size + size_of::<u32>() + fxhashmap_entry_overhead;
314 let local_tf_buffer_bytes = self.local_tf_buffer.capacity() * local_tf_entry_size;
315
316 let mut sparse_vectors_bytes: usize = 0;
318 for builder in self.sparse_vectors.values() {
319 for postings in builder.postings.values() {
320 sparse_vectors_bytes += postings.capacity() * sparse_entry_size + vec_overhead;
321 }
322 let inner_entry_size = size_of::<u32>() + vec_overhead + fxhashmap_entry_overhead;
324 sparse_vectors_bytes += builder.postings.len() * inner_entry_size;
325 }
326 let outer_sparse_entry_size =
328 size_of::<u32>() + size_of::<SparseVectorBuilder>() + fxhashmap_entry_overhead;
329 sparse_vectors_bytes += self.sparse_vectors.len() * outer_sparse_entry_size;
330
331 let mut position_index_bytes: usize = 0;
333 for pos_builder in self.position_index.values() {
334 for (_, positions) in &pos_builder.postings {
335 position_index_bytes += positions.capacity() * size_of::<u32>() + vec_overhead;
336 }
337 let pos_entry_size = size_of::<DocId>() + vec_overhead;
339 position_index_bytes += pos_builder.postings.capacity() * pos_entry_size;
340 }
341 let pos_index_entry_size =
343 term_key_size + size_of::<PositionPostingListBuilder>() + hashmap_entry_base_overhead;
344 position_index_bytes += self.position_index.len() * pos_index_entry_size;
345
346 let estimated_memory_bytes = postings_bytes
347 + index_overhead_bytes
348 + interner_bytes
349 + field_lengths_bytes
350 + dense_vectors_bytes
351 + local_tf_buffer_bytes
352 + sparse_vectors_bytes
353 + position_index_bytes;
354
355 let memory_breakdown = MemoryBreakdown {
356 postings_bytes,
357 index_overhead_bytes,
358 interner_bytes,
359 field_lengths_bytes,
360 dense_vectors_bytes,
361 dense_vector_count,
362 sparse_vectors_bytes,
363 position_index_bytes,
364 };
365
366 SegmentBuilderStats {
367 num_docs: self.next_doc_id,
368 unique_terms: self.inverted_index.len(),
369 postings_in_memory,
370 interned_strings: self.term_interner.len(),
371 doc_field_lengths_size: self.doc_field_lengths.len(),
372 estimated_memory_bytes,
373 memory_breakdown,
374 }
375 }
376
377 pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
379 let doc_id = self.next_doc_id;
380 self.next_doc_id += 1;
381
382 let base_idx = self.doc_field_lengths.len();
384 self.doc_field_lengths
385 .resize(base_idx + self.num_indexed_fields, 0);
386 self.estimated_memory += self.num_indexed_fields * std::mem::size_of::<u32>();
387
388 self.current_element_ordinal.clear();
390
391 for (field, value) in doc.field_values() {
392 let Some(entry) = self.schema.get_field_entry(*field) else {
393 continue;
394 };
395
396 if !matches!(&entry.field_type, FieldType::DenseVector) && !entry.indexed && !entry.fast
399 {
400 continue;
401 }
402
403 match (&entry.field_type, value) {
404 (FieldType::Text, FieldValue::Text(text)) => {
405 if entry.indexed {
406 let element_ordinal = self.next_element_ordinal(field.0);
407 let token_count =
408 self.index_text_field(*field, doc_id, text, element_ordinal)?;
409
410 let stats = self.field_stats.entry(field.0).or_default();
411 stats.total_tokens += token_count as u64;
412 if element_ordinal == 0 {
413 stats.doc_count += 1;
414 }
415
416 if let Some(&slot) = self.field_to_slot.get(&field.0) {
417 self.doc_field_lengths[base_idx + slot] = token_count;
418 }
419 }
420
421 if let Some(ff) = self.fast_fields.get_mut(&field.0) {
423 ff.add_text(doc_id, text);
424 }
425 }
426 (FieldType::U64, FieldValue::U64(v)) => {
427 if entry.indexed {
428 self.index_numeric_field(*field, doc_id, *v)?;
429 }
430 if let Some(ff) = self.fast_fields.get_mut(&field.0) {
431 ff.add_u64(doc_id, *v);
432 }
433 }
434 (FieldType::I64, FieldValue::I64(v)) => {
435 if entry.indexed {
436 self.index_numeric_field(*field, doc_id, *v as u64)?;
437 }
438 if let Some(ff) = self.fast_fields.get_mut(&field.0) {
439 ff.add_i64(doc_id, *v);
440 }
441 }
442 (FieldType::F64, FieldValue::F64(v)) => {
443 if entry.indexed {
444 self.index_numeric_field(*field, doc_id, v.to_bits())?;
445 }
446 if let Some(ff) = self.fast_fields.get_mut(&field.0) {
447 ff.add_f64(doc_id, *v);
448 }
449 }
450 (FieldType::DenseVector, FieldValue::DenseVector(vec))
451 if entry.indexed || entry.stored =>
452 {
453 let ordinal = self.next_element_ordinal(field.0);
454 self.index_dense_vector_field(*field, doc_id, ordinal as u16, vec)?;
455 }
456 (FieldType::SparseVector, FieldValue::SparseVector(entries)) => {
457 let ordinal = self.next_element_ordinal(field.0);
458 self.index_sparse_vector_field(*field, doc_id, ordinal as u16, entries)?;
459 }
460 _ => {}
461 }
462 }
463
464 self.write_document_to_store(&doc)?;
466
467 Ok(doc_id)
468 }
469
470 fn index_text_field(
479 &mut self,
480 field: Field,
481 doc_id: DocId,
482 text: &str,
483 element_ordinal: u32,
484 ) -> Result<u32> {
485 use crate::dsl::PositionMode;
486
487 let field_id = field.0;
488 let position_mode = self
489 .position_enabled_fields
490 .get(&field_id)
491 .copied()
492 .flatten();
493
494 self.local_tf_buffer.clear();
498 for v in self.local_positions.values_mut() {
500 v.clear();
501 }
502
503 let mut token_position = 0u32;
504
505 let custom_tokens = self.tokenizers.get(&field).map(|t| t.tokenize(text));
509
510 if let Some(tokens) = custom_tokens {
511 for token in &tokens {
513 let is_new_string = !self.term_interner.contains(&token.text);
514 let term_spur = self.term_interner.get_or_intern(&token.text);
515 if is_new_string {
516 self.estimated_memory += token.text.len() + INTERN_OVERHEAD;
517 }
518 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
519
520 if let Some(mode) = position_mode {
521 let encoded_pos = match mode {
522 PositionMode::Ordinal => element_ordinal << 20,
523 PositionMode::TokenPosition => token.position,
524 PositionMode::Full => (element_ordinal << 20) | token.position,
525 };
526 self.local_positions
527 .entry(term_spur)
528 .or_default()
529 .push(encoded_pos);
530 }
531 }
532 token_position = tokens.len() as u32;
533 } else {
534 for word in text.split_whitespace() {
536 self.token_buffer.clear();
537 for c in word.chars() {
538 if c.is_alphanumeric() {
539 for lc in c.to_lowercase() {
540 self.token_buffer.push(lc);
541 }
542 }
543 }
544
545 if self.token_buffer.is_empty() {
546 continue;
547 }
548
549 let is_new_string = !self.term_interner.contains(&self.token_buffer);
550 let term_spur = self.term_interner.get_or_intern(&self.token_buffer);
551 if is_new_string {
552 self.estimated_memory += self.token_buffer.len() + INTERN_OVERHEAD;
553 }
554 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
555
556 if let Some(mode) = position_mode {
557 let encoded_pos = match mode {
558 PositionMode::Ordinal => element_ordinal << 20,
559 PositionMode::TokenPosition => token_position,
560 PositionMode::Full => (element_ordinal << 20) | token_position,
561 };
562 self.local_positions
563 .entry(term_spur)
564 .or_default()
565 .push(encoded_pos);
566 }
567
568 token_position += 1;
569 }
570 }
571
572 for (&term_spur, &tf) in &self.local_tf_buffer {
575 let term_key = TermKey {
576 field: field_id,
577 term: term_spur,
578 };
579
580 let is_new_term = !self.inverted_index.contains_key(&term_key);
581 let posting = self
582 .inverted_index
583 .entry(term_key)
584 .or_insert_with(PostingListBuilder::new);
585 posting.add(doc_id, tf);
586
587 self.estimated_memory += size_of::<CompactPosting>();
588 if is_new_term {
589 self.estimated_memory += NEW_TERM_OVERHEAD;
590 }
591
592 if position_mode.is_some()
593 && let Some(positions) = self.local_positions.get(&term_spur)
594 {
595 let is_new_pos_term = !self.position_index.contains_key(&term_key);
596 let pos_posting = self
597 .position_index
598 .entry(term_key)
599 .or_insert_with(PositionPostingListBuilder::new);
600 for &pos in positions {
601 pos_posting.add_position(doc_id, pos);
602 }
603 self.estimated_memory += positions.len() * size_of::<u32>();
604 if is_new_pos_term {
605 self.estimated_memory += NEW_POS_TERM_OVERHEAD;
606 }
607 }
608 }
609
610 Ok(token_position)
611 }
612
613 fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
614 use std::fmt::Write;
615
616 self.numeric_buffer.clear();
617 write!(self.numeric_buffer, "__num_{}", value).unwrap();
618 let is_new_string = !self.term_interner.contains(&self.numeric_buffer);
619 let term_spur = self.term_interner.get_or_intern(&self.numeric_buffer);
620
621 let term_key = TermKey {
622 field: field.0,
623 term: term_spur,
624 };
625
626 let is_new_term = !self.inverted_index.contains_key(&term_key);
627 let posting = self
628 .inverted_index
629 .entry(term_key)
630 .or_insert_with(PostingListBuilder::new);
631 posting.add(doc_id, 1);
632
633 self.estimated_memory += size_of::<CompactPosting>();
634 if is_new_term {
635 self.estimated_memory += NEW_TERM_OVERHEAD;
636 }
637 if is_new_string {
638 self.estimated_memory += self.numeric_buffer.len() + INTERN_OVERHEAD;
639 }
640
641 Ok(())
642 }
643
644 fn index_dense_vector_field(
646 &mut self,
647 field: Field,
648 doc_id: DocId,
649 ordinal: u16,
650 vector: &[f32],
651 ) -> Result<()> {
652 let dim = vector.len();
653
654 let builder = self
655 .dense_vectors
656 .entry(field.0)
657 .or_insert_with(|| DenseVectorBuilder::new(dim));
658
659 if builder.dim != dim && builder.len() > 0 {
661 return Err(crate::Error::Schema(format!(
662 "Dense vector dimension mismatch: expected {}, got {}",
663 builder.dim, dim
664 )));
665 }
666
667 builder.add(doc_id, ordinal, vector);
668
669 self.estimated_memory += std::mem::size_of_val(vector) + size_of::<(DocId, u16)>();
670
671 Ok(())
672 }
673
674 fn index_sparse_vector_field(
681 &mut self,
682 field: Field,
683 doc_id: DocId,
684 ordinal: u16,
685 entries: &[(u32, f32)],
686 ) -> Result<()> {
687 let weight_threshold = self
689 .schema
690 .get_field_entry(field)
691 .and_then(|entry| entry.sparse_vector_config.as_ref())
692 .map(|config| config.weight_threshold)
693 .unwrap_or(0.0);
694
695 let builder = self
696 .sparse_vectors
697 .entry(field.0)
698 .or_insert_with(SparseVectorBuilder::new);
699
700 builder.inc_vector_count();
701
702 for &(dim_id, weight) in entries {
703 if weight.abs() < weight_threshold {
705 continue;
706 }
707
708 let is_new_dim = !builder.postings.contains_key(&dim_id);
709 builder.add(dim_id, doc_id, ordinal, weight);
710 self.estimated_memory += size_of::<(DocId, u16, f32)>();
711 if is_new_dim {
712 self.estimated_memory += size_of::<u32>() + size_of::<Vec<(DocId, u16, f32)>>() + 8; }
715 }
716
717 Ok(())
718 }
719
720 fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
722 use byteorder::{LittleEndian, WriteBytesExt};
723
724 super::store::serialize_document_into(doc, &self.schema, &mut self.doc_serialize_buffer)?;
725
726 self.store_file
727 .write_u32::<LittleEndian>(self.doc_serialize_buffer.len() as u32)?;
728 self.store_file.write_all(&self.doc_serialize_buffer)?;
729
730 Ok(())
731 }
732
733 pub async fn build<D: Directory + DirectoryWriter>(
739 mut self,
740 dir: &D,
741 segment_id: SegmentId,
742 trained: Option<&super::TrainedVectorStructures>,
743 ) -> Result<SegmentMeta> {
744 self.store_file.flush()?;
746
747 let files = SegmentFiles::new(segment_id.0);
748
749 let position_index = std::mem::take(&mut self.position_index);
751 let position_offsets = if !position_index.is_empty() {
752 let mut pos_writer = dir.streaming_writer(&files.positions).await?;
753 let offsets = postings::build_positions_streaming(
754 position_index,
755 &self.term_interner,
756 &mut *pos_writer,
757 )?;
758 pos_writer.finish()?;
759 offsets
760 } else {
761 FxHashMap::default()
762 };
763
764 let inverted_index = std::mem::take(&mut self.inverted_index);
767 let term_interner = std::mem::replace(&mut self.term_interner, Rodeo::new());
768 let store_path = self.store_path.clone();
769 let num_compression_threads = self.config.num_compression_threads;
770 let compression_level = self.config.compression_level;
771 let dense_vectors = std::mem::take(&mut self.dense_vectors);
772 let mut sparse_vectors = std::mem::take(&mut self.sparse_vectors);
773 let schema = &self.schema;
774
775 let mut term_dict_writer =
778 super::OffsetWriter::new(dir.streaming_writer(&files.term_dict).await?);
779 let mut postings_writer =
780 super::OffsetWriter::new(dir.streaming_writer(&files.postings).await?);
781 let mut store_writer = super::OffsetWriter::new(dir.streaming_writer(&files.store).await?);
782 let mut vectors_writer = if !dense_vectors.is_empty() {
783 Some(super::OffsetWriter::new(
784 dir.streaming_writer(&files.vectors).await?,
785 ))
786 } else {
787 None
788 };
789 let mut sparse_writer = if !sparse_vectors.is_empty() {
790 Some(super::OffsetWriter::new(
791 dir.streaming_writer(&files.sparse).await?,
792 ))
793 } else {
794 None
795 };
796 let mut fast_fields = std::mem::take(&mut self.fast_fields);
797 let num_docs = self.next_doc_id;
798 let mut fast_writer = if !fast_fields.is_empty() {
799 Some(super::OffsetWriter::new(
800 dir.streaming_writer(&files.fast).await?,
801 ))
802 } else {
803 None
804 };
805
806 let ((postings_result, store_result), ((vectors_result, sparse_result), fast_result)) =
807 rayon::join(
808 || {
809 rayon::join(
810 || {
811 postings::build_postings_streaming(
812 inverted_index,
813 term_interner,
814 &position_offsets,
815 &mut term_dict_writer,
816 &mut postings_writer,
817 )
818 },
819 || {
820 store::build_store_streaming(
821 &store_path,
822 num_compression_threads,
823 compression_level,
824 &mut store_writer,
825 num_docs,
826 )
827 },
828 )
829 },
830 || {
831 rayon::join(
832 || {
833 rayon::join(
834 || -> Result<()> {
835 if let Some(ref mut w) = vectors_writer {
836 dense::build_vectors_streaming(
837 dense_vectors,
838 schema,
839 trained,
840 w,
841 )?;
842 }
843 Ok(())
844 },
845 || -> Result<()> {
846 if let Some(ref mut w) = sparse_writer {
847 sparse::build_sparse_streaming(
848 &mut sparse_vectors,
849 schema,
850 w,
851 )?;
852 }
853 Ok(())
854 },
855 )
856 },
857 || -> Result<()> {
858 if let Some(ref mut w) = fast_writer {
859 build_fast_fields_streaming(&mut fast_fields, num_docs, w)?;
860 }
861 Ok(())
862 },
863 )
864 },
865 );
866 postings_result?;
867 store_result?;
868 vectors_result?;
869 sparse_result?;
870 fast_result?;
871
872 let term_dict_bytes = term_dict_writer.offset() as usize;
873 let postings_bytes = postings_writer.offset() as usize;
874 let store_bytes = store_writer.offset() as usize;
875 let vectors_bytes = vectors_writer.as_ref().map_or(0, |w| w.offset() as usize);
876 let sparse_bytes = sparse_writer.as_ref().map_or(0, |w| w.offset() as usize);
877 let fast_bytes = fast_writer.as_ref().map_or(0, |w| w.offset() as usize);
878
879 term_dict_writer.finish()?;
880 postings_writer.finish()?;
881 store_writer.finish()?;
882 if let Some(w) = vectors_writer {
883 w.finish()?;
884 }
885 if let Some(w) = sparse_writer {
886 w.finish()?;
887 }
888 if let Some(w) = fast_writer {
889 w.finish()?;
890 }
891 drop(position_offsets);
892 drop(sparse_vectors);
893
894 log::info!(
895 "[segment_build] {} docs: term_dict={}, postings={}, store={}, vectors={}, sparse={}, fast={}",
896 num_docs,
897 super::format_bytes(term_dict_bytes),
898 super::format_bytes(postings_bytes),
899 super::format_bytes(store_bytes),
900 super::format_bytes(vectors_bytes),
901 super::format_bytes(sparse_bytes),
902 super::format_bytes(fast_bytes),
903 );
904
905 let meta = SegmentMeta {
906 id: segment_id.0,
907 num_docs: self.next_doc_id,
908 field_stats: self.field_stats.clone(),
909 };
910
911 dir.write(&files.meta, &meta.serialize()?).await?;
912
913 let _ = std::fs::remove_file(&self.store_path);
915
916 Ok(meta)
917 }
918}
919
920fn build_fast_fields_streaming(
922 fast_fields: &mut FxHashMap<u32, crate::structures::fast_field::FastFieldWriter>,
923 num_docs: u32,
924 writer: &mut dyn Write,
925) -> Result<()> {
926 use crate::structures::fast_field::{FastFieldTocEntry, write_fast_field_toc_and_footer};
927
928 if fast_fields.is_empty() {
929 return Ok(());
930 }
931
932 let mut field_ids: Vec<u32> = fast_fields.keys().copied().collect();
934 field_ids.sort_unstable();
935
936 let mut toc_entries: Vec<FastFieldTocEntry> = Vec::with_capacity(field_ids.len());
937 let mut current_offset = 0u64;
938
939 for &field_id in &field_ids {
940 let ff = fast_fields.get_mut(&field_id).unwrap();
941 ff.pad_to(num_docs);
942
943 let (mut toc, bytes_written) = ff.serialize(writer, current_offset)?;
944 toc.field_id = field_id;
945 current_offset += bytes_written;
946 toc_entries.push(toc);
947 }
948
949 let toc_offset = current_offset;
951 write_fast_field_toc_and_footer(writer, toc_offset, &toc_entries)?;
952
953 Ok(())
954}
955
956impl Drop for SegmentBuilder {
957 fn drop(&mut self) {
958 let _ = std::fs::remove_file(&self.store_path);
960 }
961}