1pub(crate) mod bmp;
12mod config;
13mod dense;
14#[cfg(feature = "diagnostics")]
15mod diagnostics;
16pub(crate) mod graph_bisection;
17mod postings;
18mod sparse;
19mod store;
20
21pub use config::{MemoryBreakdown, SegmentBuilderConfig, SegmentBuilderStats};
22
23use std::fs::{File, OpenOptions};
24use std::io::{BufWriter, Write};
25use std::mem::size_of;
26use std::path::PathBuf;
27
28use hashbrown::HashMap;
29use lasso::{Rodeo, Spur};
30use rustc_hash::FxHashMap;
31
32use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
33use std::sync::Arc;
34
35use crate::directories::{Directory, DirectoryWriter};
36use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
37use crate::tokenizer::BoxedTokenizer;
38use crate::{DocId, Result};
39
40use dense::DenseVectorBuilder;
41use postings::{CompactPosting, PositionPostingListBuilder, PostingListBuilder, TermKey};
42use sparse::SparseVectorBuilder;
43
44const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; const NEW_TERM_OVERHEAD: usize = size_of::<TermKey>() + size_of::<PostingListBuilder>() + 24;
50
51const INTERN_OVERHEAD: usize = size_of::<Spur>() + 2 * size_of::<usize>();
53
54const NEW_POS_TERM_OVERHEAD: usize =
56 size_of::<TermKey>() + size_of::<PositionPostingListBuilder>() + 24;
57
58pub struct SegmentBuilder {
65 schema: Arc<Schema>,
66 config: SegmentBuilderConfig,
67 tokenizers: FxHashMap<Field, BoxedTokenizer>,
68
69 term_interner: Rodeo,
71
72 inverted_index: HashMap<TermKey, PostingListBuilder>,
74
75 store_file: BufWriter<File>,
77 store_path: PathBuf,
78
79 next_doc_id: DocId,
81
82 field_stats: FxHashMap<u32, FieldStats>,
84
85 doc_field_lengths: Vec<u32>,
89 num_indexed_fields: usize,
90 field_to_slot: FxHashMap<u32, usize>,
91
92 local_tf_buffer: FxHashMap<Spur, u32>,
95
96 local_positions: FxHashMap<Spur, Vec<u32>>,
99
100 token_buffer: String,
102
103 numeric_buffer: String,
105
106 dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
109
110 sparse_vectors: FxHashMap<u32, SparseVectorBuilder>,
113
114 position_index: HashMap<TermKey, PositionPostingListBuilder>,
117
118 position_enabled_fields: FxHashMap<u32, Option<crate::dsl::PositionMode>>,
120
121 current_element_ordinal: FxHashMap<u32, u32>,
123
124 estimated_memory: usize,
126
127 doc_serialize_buffer: Vec<u8>,
129
130 fast_fields: FxHashMap<u32, crate::structures::fast_field::FastFieldWriter>,
132}
133
134impl SegmentBuilder {
135 pub fn new(schema: Arc<Schema>, config: SegmentBuilderConfig) -> Result<Self> {
137 let segment_id = uuid::Uuid::new_v4();
138 let store_path = config
139 .temp_dir
140 .join(format!("hermes_store_{}.tmp", segment_id));
141
142 let store_file = BufWriter::with_capacity(
143 STORE_BUFFER_SIZE,
144 OpenOptions::new()
145 .create(true)
146 .write(true)
147 .truncate(true)
148 .open(&store_path)?,
149 );
150
151 let registry = crate::tokenizer::TokenizerRegistry::new();
153 let mut num_indexed_fields = 0;
154 let mut field_to_slot = FxHashMap::default();
155 let mut position_enabled_fields = FxHashMap::default();
156 let mut tokenizers = FxHashMap::default();
157 for (field, entry) in schema.fields() {
158 if entry.indexed && matches!(entry.field_type, FieldType::Text) {
159 field_to_slot.insert(field.0, num_indexed_fields);
160 num_indexed_fields += 1;
161 if entry.positions.is_some() {
162 position_enabled_fields.insert(field.0, entry.positions);
163 }
164 if let Some(ref tok_name) = entry.tokenizer
165 && let Some(tokenizer) = registry.get(tok_name)
166 {
167 tokenizers.insert(field, tokenizer);
168 }
169 }
170 }
171
172 use crate::structures::fast_field::{FastFieldColumnType, FastFieldWriter};
174 let mut fast_fields = FxHashMap::default();
175 for (field, entry) in schema.fields() {
176 if entry.fast {
177 let writer = if entry.multi {
178 match entry.field_type {
179 FieldType::U64 => {
180 FastFieldWriter::new_numeric_multi(FastFieldColumnType::U64)
181 }
182 FieldType::I64 => {
183 FastFieldWriter::new_numeric_multi(FastFieldColumnType::I64)
184 }
185 FieldType::F64 => {
186 FastFieldWriter::new_numeric_multi(FastFieldColumnType::F64)
187 }
188 FieldType::Text => FastFieldWriter::new_text_multi(),
189 _ => continue,
190 }
191 } else {
192 match entry.field_type {
193 FieldType::U64 => FastFieldWriter::new_numeric(FastFieldColumnType::U64),
194 FieldType::I64 => FastFieldWriter::new_numeric(FastFieldColumnType::I64),
195 FieldType::F64 => FastFieldWriter::new_numeric(FastFieldColumnType::F64),
196 FieldType::Text => FastFieldWriter::new_text(),
197 _ => continue,
198 }
199 };
200 fast_fields.insert(field.0, writer);
201 }
202 }
203
204 Ok(Self {
205 schema,
206 tokenizers,
207 term_interner: Rodeo::new(),
208 inverted_index: HashMap::with_capacity(config.posting_map_capacity),
209 store_file,
210 store_path,
211 next_doc_id: 0,
212 field_stats: FxHashMap::default(),
213 doc_field_lengths: Vec::new(),
214 num_indexed_fields,
215 field_to_slot,
216 local_tf_buffer: FxHashMap::default(),
217 local_positions: FxHashMap::default(),
218 token_buffer: String::with_capacity(64),
219 numeric_buffer: String::with_capacity(32),
220 config,
221 dense_vectors: FxHashMap::default(),
222 sparse_vectors: FxHashMap::default(),
223 position_index: HashMap::new(),
224 position_enabled_fields,
225 current_element_ordinal: FxHashMap::default(),
226 estimated_memory: 0,
227 doc_serialize_buffer: Vec::with_capacity(256),
228 fast_fields,
229 })
230 }
231
232 pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
233 self.tokenizers.insert(field, tokenizer);
234 }
235
236 fn next_element_ordinal(&mut self, field_id: u32) -> u32 {
239 let ordinal = *self.current_element_ordinal.get(&field_id).unwrap_or(&0);
240 *self.current_element_ordinal.entry(field_id).or_insert(0) += 1;
241 ordinal
242 }
243
244 pub fn num_docs(&self) -> u32 {
245 self.next_doc_id
246 }
247
248 #[inline]
250 pub fn estimated_memory_bytes(&self) -> usize {
251 self.estimated_memory
252 }
253
254 pub fn sparse_dim_count(&self) -> usize {
256 self.sparse_vectors.values().map(|b| b.postings.len()).sum()
257 }
258
259 pub fn stats(&self) -> SegmentBuilderStats {
261 use std::mem::size_of;
262
263 let postings_in_memory: usize =
264 self.inverted_index.values().map(|p| p.postings.len()).sum();
265
266 let compact_posting_size = size_of::<CompactPosting>();
268 let vec_overhead = size_of::<Vec<u8>>(); let term_key_size = size_of::<TermKey>();
270 let posting_builder_size = size_of::<PostingListBuilder>();
271 let spur_size = size_of::<lasso::Spur>();
272 let sparse_entry_size = size_of::<(DocId, u16, f32)>();
273
274 let hashmap_entry_base_overhead = 8usize;
277
278 let fxhashmap_entry_overhead = hashmap_entry_base_overhead;
280
281 let postings_bytes: usize = self
283 .inverted_index
284 .values()
285 .map(|p| p.postings.capacity() * compact_posting_size + vec_overhead)
286 .sum();
287
288 let index_overhead_bytes = self.inverted_index.len()
290 * (term_key_size + posting_builder_size + hashmap_entry_base_overhead);
291
292 let interner_arena_overhead = 2 * size_of::<usize>();
295 let avg_term_len = 8; let interner_bytes =
297 self.term_interner.len() * (avg_term_len + spur_size + interner_arena_overhead);
298
299 let field_lengths_bytes =
301 self.doc_field_lengths.capacity() * size_of::<u32>() + vec_overhead;
302
303 let mut dense_vectors_bytes: usize = 0;
305 let mut dense_vector_count: usize = 0;
306 let doc_id_ordinal_size = size_of::<(DocId, u16)>();
307 for b in self.dense_vectors.values() {
308 dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
309 + b.doc_ids.capacity() * doc_id_ordinal_size
310 + 2 * vec_overhead; dense_vector_count += b.doc_ids.len();
312 }
313
314 let local_tf_entry_size = spur_size + size_of::<u32>() + fxhashmap_entry_overhead;
316 let local_tf_buffer_bytes = self.local_tf_buffer.capacity() * local_tf_entry_size;
317
318 let mut sparse_vectors_bytes: usize = 0;
320 for builder in self.sparse_vectors.values() {
321 for postings in builder.postings.values() {
322 sparse_vectors_bytes += postings.capacity() * sparse_entry_size + vec_overhead;
323 }
324 let inner_entry_size = size_of::<u32>() + vec_overhead + fxhashmap_entry_overhead;
326 sparse_vectors_bytes += builder.postings.len() * inner_entry_size;
327 }
328 let outer_sparse_entry_size =
330 size_of::<u32>() + size_of::<SparseVectorBuilder>() + fxhashmap_entry_overhead;
331 sparse_vectors_bytes += self.sparse_vectors.len() * outer_sparse_entry_size;
332
333 let mut position_index_bytes: usize = 0;
335 for pos_builder in self.position_index.values() {
336 for (_, positions) in &pos_builder.postings {
337 position_index_bytes += positions.capacity() * size_of::<u32>() + vec_overhead;
338 }
339 let pos_entry_size = size_of::<DocId>() + vec_overhead;
341 position_index_bytes += pos_builder.postings.capacity() * pos_entry_size;
342 }
343 let pos_index_entry_size =
345 term_key_size + size_of::<PositionPostingListBuilder>() + hashmap_entry_base_overhead;
346 position_index_bytes += self.position_index.len() * pos_index_entry_size;
347
348 let estimated_memory_bytes = postings_bytes
349 + index_overhead_bytes
350 + interner_bytes
351 + field_lengths_bytes
352 + dense_vectors_bytes
353 + local_tf_buffer_bytes
354 + sparse_vectors_bytes
355 + position_index_bytes;
356
357 let memory_breakdown = MemoryBreakdown {
358 postings_bytes,
359 index_overhead_bytes,
360 interner_bytes,
361 field_lengths_bytes,
362 dense_vectors_bytes,
363 dense_vector_count,
364 sparse_vectors_bytes,
365 position_index_bytes,
366 };
367
368 SegmentBuilderStats {
369 num_docs: self.next_doc_id,
370 unique_terms: self.inverted_index.len(),
371 postings_in_memory,
372 interned_strings: self.term_interner.len(),
373 doc_field_lengths_size: self.doc_field_lengths.len(),
374 estimated_memory_bytes,
375 memory_breakdown,
376 }
377 }
378
379 pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
381 let doc_id = self.next_doc_id;
382 self.next_doc_id += 1;
383
384 let base_idx = self.doc_field_lengths.len();
386 self.doc_field_lengths
387 .resize(base_idx + self.num_indexed_fields, 0);
388 self.estimated_memory += self.num_indexed_fields * std::mem::size_of::<u32>();
389
390 self.current_element_ordinal.clear();
392
393 for (field, value) in doc.field_values() {
394 let Some(entry) = self.schema.get_field_entry(*field) else {
395 continue;
396 };
397
398 if !matches!(&entry.field_type, FieldType::DenseVector) && !entry.indexed && !entry.fast
401 {
402 continue;
403 }
404
405 match (&entry.field_type, value) {
406 (FieldType::Text, FieldValue::Text(text)) => {
407 if entry.indexed {
408 let element_ordinal = self.next_element_ordinal(field.0);
409 let token_count =
410 self.index_text_field(*field, doc_id, text, element_ordinal)?;
411
412 let stats = self.field_stats.entry(field.0).or_default();
413 stats.total_tokens += token_count as u64;
414 if element_ordinal == 0 {
415 stats.doc_count += 1;
416 }
417
418 if let Some(&slot) = self.field_to_slot.get(&field.0) {
419 self.doc_field_lengths[base_idx + slot] = token_count;
420 }
421 }
422
423 if let Some(ff) = self.fast_fields.get_mut(&field.0) {
425 ff.add_text(doc_id, text);
426 }
427 }
428 (FieldType::U64, FieldValue::U64(v)) => {
429 if entry.indexed {
430 self.index_numeric_field(*field, doc_id, *v)?;
431 }
432 if let Some(ff) = self.fast_fields.get_mut(&field.0) {
433 ff.add_u64(doc_id, *v);
434 }
435 }
436 (FieldType::I64, FieldValue::I64(v)) => {
437 if entry.indexed {
438 self.index_numeric_field(*field, doc_id, *v as u64)?;
439 }
440 if let Some(ff) = self.fast_fields.get_mut(&field.0) {
441 ff.add_i64(doc_id, *v);
442 }
443 }
444 (FieldType::F64, FieldValue::F64(v)) => {
445 if entry.indexed {
446 self.index_numeric_field(*field, doc_id, v.to_bits())?;
447 }
448 if let Some(ff) = self.fast_fields.get_mut(&field.0) {
449 ff.add_f64(doc_id, *v);
450 }
451 }
452 (FieldType::DenseVector, FieldValue::DenseVector(vec))
453 if entry.indexed || entry.stored =>
454 {
455 let ordinal = self.next_element_ordinal(field.0);
456 self.index_dense_vector_field(*field, doc_id, ordinal as u16, vec)?;
457 }
458 (FieldType::SparseVector, FieldValue::SparseVector(entries)) => {
459 let ordinal = self.next_element_ordinal(field.0);
460 self.index_sparse_vector_field(*field, doc_id, ordinal as u16, entries)?;
461 }
462 _ => {}
463 }
464 }
465
466 self.write_document_to_store(&doc)?;
468
469 Ok(doc_id)
470 }
471
472 fn index_text_field(
481 &mut self,
482 field: Field,
483 doc_id: DocId,
484 text: &str,
485 element_ordinal: u32,
486 ) -> Result<u32> {
487 use crate::dsl::PositionMode;
488
489 let field_id = field.0;
490 let position_mode = self
491 .position_enabled_fields
492 .get(&field_id)
493 .copied()
494 .flatten();
495
496 self.local_tf_buffer.clear();
500 for v in self.local_positions.values_mut() {
502 v.clear();
503 }
504
505 let mut token_position = 0u32;
506
507 let custom_tokens = self.tokenizers.get(&field).map(|t| t.tokenize(text));
511
512 if let Some(tokens) = custom_tokens {
513 for token in &tokens {
515 let term_spur = if let Some(spur) = self.term_interner.get(&token.text) {
516 spur
517 } else {
518 let spur = self.term_interner.get_or_intern(&token.text);
519 self.estimated_memory += token.text.len() + INTERN_OVERHEAD;
520 spur
521 };
522 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
523
524 if let Some(mode) = position_mode {
525 let encoded_pos = match mode {
526 PositionMode::Ordinal => element_ordinal << 20,
527 PositionMode::TokenPosition => token.position,
528 PositionMode::Full => (element_ordinal << 20) | token.position,
529 };
530 self.local_positions
531 .entry(term_spur)
532 .or_default()
533 .push(encoded_pos);
534 }
535 }
536 token_position = tokens.len() as u32;
537 } else {
538 for word in text.split_whitespace() {
540 self.token_buffer.clear();
541 for c in word.chars() {
542 if c.is_alphanumeric() {
543 for lc in c.to_lowercase() {
544 self.token_buffer.push(lc);
545 }
546 }
547 }
548
549 if self.token_buffer.is_empty() {
550 continue;
551 }
552
553 let term_spur = if let Some(spur) = self.term_interner.get(&self.token_buffer) {
554 spur
555 } else {
556 let spur = self.term_interner.get_or_intern(&self.token_buffer);
557 self.estimated_memory += self.token_buffer.len() + INTERN_OVERHEAD;
558 spur
559 };
560 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
561
562 if let Some(mode) = position_mode {
563 let encoded_pos = match mode {
564 PositionMode::Ordinal => element_ordinal << 20,
565 PositionMode::TokenPosition => token_position,
566 PositionMode::Full => (element_ordinal << 20) | token_position,
567 };
568 self.local_positions
569 .entry(term_spur)
570 .or_default()
571 .push(encoded_pos);
572 }
573
574 token_position += 1;
575 }
576 }
577
578 for (&term_spur, &tf) in &self.local_tf_buffer {
581 let term_key = TermKey {
582 field: field_id,
583 term: term_spur,
584 };
585
586 match self.inverted_index.entry(term_key) {
587 hashbrown::hash_map::Entry::Occupied(mut o) => {
588 o.get_mut().add(doc_id, tf);
589 self.estimated_memory += size_of::<CompactPosting>();
590 }
591 hashbrown::hash_map::Entry::Vacant(v) => {
592 let mut posting = PostingListBuilder::new();
593 posting.add(doc_id, tf);
594 v.insert(posting);
595 self.estimated_memory += size_of::<CompactPosting>() + NEW_TERM_OVERHEAD;
596 }
597 }
598
599 if position_mode.is_some()
600 && let Some(positions) = self.local_positions.get(&term_spur)
601 {
602 match self.position_index.entry(term_key) {
603 hashbrown::hash_map::Entry::Occupied(mut o) => {
604 for &pos in positions {
605 o.get_mut().add_position(doc_id, pos);
606 }
607 self.estimated_memory += positions.len() * size_of::<u32>();
608 }
609 hashbrown::hash_map::Entry::Vacant(v) => {
610 let mut pos_posting = PositionPostingListBuilder::new();
611 for &pos in positions {
612 pos_posting.add_position(doc_id, pos);
613 }
614 self.estimated_memory +=
615 positions.len() * size_of::<u32>() + NEW_POS_TERM_OVERHEAD;
616 v.insert(pos_posting);
617 }
618 }
619 }
620 }
621
622 Ok(token_position)
623 }
624
625 fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
626 use std::fmt::Write;
627
628 self.numeric_buffer.clear();
629 write!(self.numeric_buffer, "__num_{}", value).unwrap();
630 let term_spur = if let Some(spur) = self.term_interner.get(&self.numeric_buffer) {
631 spur
632 } else {
633 let spur = self.term_interner.get_or_intern(&self.numeric_buffer);
634 self.estimated_memory += self.numeric_buffer.len() + INTERN_OVERHEAD;
635 spur
636 };
637
638 let term_key = TermKey {
639 field: field.0,
640 term: term_spur,
641 };
642
643 match self.inverted_index.entry(term_key) {
644 hashbrown::hash_map::Entry::Occupied(mut o) => {
645 o.get_mut().add(doc_id, 1);
646 self.estimated_memory += size_of::<CompactPosting>();
647 }
648 hashbrown::hash_map::Entry::Vacant(v) => {
649 let mut posting = PostingListBuilder::new();
650 posting.add(doc_id, 1);
651 v.insert(posting);
652 self.estimated_memory += size_of::<CompactPosting>() + NEW_TERM_OVERHEAD;
653 }
654 }
655
656 Ok(())
657 }
658
659 fn index_dense_vector_field(
661 &mut self,
662 field: Field,
663 doc_id: DocId,
664 ordinal: u16,
665 vector: &[f32],
666 ) -> Result<()> {
667 let dim = vector.len();
668
669 let builder = self
670 .dense_vectors
671 .entry(field.0)
672 .or_insert_with(|| DenseVectorBuilder::new(dim));
673
674 if builder.dim != dim && builder.len() > 0 {
676 return Err(crate::Error::Schema(format!(
677 "Dense vector dimension mismatch: expected {}, got {}",
678 builder.dim, dim
679 )));
680 }
681
682 builder.add(doc_id, ordinal, vector);
683
684 self.estimated_memory += std::mem::size_of_val(vector) + size_of::<(DocId, u16)>();
685
686 Ok(())
687 }
688
689 fn index_sparse_vector_field(
696 &mut self,
697 field: Field,
698 doc_id: DocId,
699 ordinal: u16,
700 entries: &[(u32, f32)],
701 ) -> Result<()> {
702 let weight_threshold = self
704 .schema
705 .get_field_entry(field)
706 .and_then(|entry| entry.sparse_vector_config.as_ref())
707 .map(|config| config.weight_threshold)
708 .unwrap_or(0.0);
709
710 let builder = self
711 .sparse_vectors
712 .entry(field.0)
713 .or_insert_with(SparseVectorBuilder::new);
714
715 builder.inc_vector_count();
716
717 for &(dim_id, weight) in entries {
718 if weight.abs() < weight_threshold {
720 continue;
721 }
722
723 let is_new_dim = !builder.postings.contains_key(&dim_id);
724 builder.add(dim_id, doc_id, ordinal, weight);
725 self.estimated_memory += size_of::<(DocId, u16, f32)>();
726 if is_new_dim {
727 self.estimated_memory += size_of::<u32>() + size_of::<Vec<(DocId, u16, f32)>>() + 8; }
730 }
731
732 Ok(())
733 }
734
735 fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
737 use byteorder::{LittleEndian, WriteBytesExt};
738
739 super::store::serialize_document_into(doc, &self.schema, &mut self.doc_serialize_buffer)?;
740
741 self.store_file
742 .write_u32::<LittleEndian>(self.doc_serialize_buffer.len() as u32)?;
743 self.store_file.write_all(&self.doc_serialize_buffer)?;
744
745 Ok(())
746 }
747
748 pub async fn build<D: Directory + DirectoryWriter>(
754 mut self,
755 dir: &D,
756 segment_id: SegmentId,
757 trained: Option<&super::TrainedVectorStructures>,
758 ) -> Result<SegmentMeta> {
759 self.store_file.flush()?;
761
762 let files = SegmentFiles::new(segment_id.0);
763
764 let position_index = std::mem::take(&mut self.position_index);
766 let position_offsets = if !position_index.is_empty() {
767 let mut pos_writer = dir.streaming_writer(&files.positions).await?;
768 let offsets = postings::build_positions_streaming(
769 position_index,
770 &self.term_interner,
771 &mut *pos_writer,
772 )?;
773 pos_writer.finish()?;
774 offsets
775 } else {
776 FxHashMap::default()
777 };
778
779 let inverted_index = std::mem::take(&mut self.inverted_index);
782 let term_interner = std::mem::replace(&mut self.term_interner, Rodeo::new());
783 let store_path = self.store_path.clone();
784 let num_compression_threads = self.config.num_compression_threads;
785 let compression_level = self.config.compression_level;
786 let dense_vectors = std::mem::take(&mut self.dense_vectors);
787 let mut sparse_vectors = std::mem::take(&mut self.sparse_vectors);
788 let schema = &self.schema;
789
790 let mut term_dict_writer =
793 super::OffsetWriter::new(dir.streaming_writer(&files.term_dict).await?);
794 let mut postings_writer =
795 super::OffsetWriter::new(dir.streaming_writer(&files.postings).await?);
796 let mut store_writer = super::OffsetWriter::new(dir.streaming_writer(&files.store).await?);
797 let mut vectors_writer = if !dense_vectors.is_empty() {
798 Some(super::OffsetWriter::new(
799 dir.streaming_writer(&files.vectors).await?,
800 ))
801 } else {
802 None
803 };
804 let mut sparse_writer = if !sparse_vectors.is_empty() {
805 Some(super::OffsetWriter::new(
806 dir.streaming_writer(&files.sparse).await?,
807 ))
808 } else {
809 None
810 };
811 let mut fast_fields = std::mem::take(&mut self.fast_fields);
812 let num_docs = self.next_doc_id;
813 let mut fast_writer = if !fast_fields.is_empty() {
814 Some(super::OffsetWriter::new(
815 dir.streaming_writer(&files.fast).await?,
816 ))
817 } else {
818 None
819 };
820
821 let ((postings_result, store_result), ((vectors_result, sparse_result), fast_result)) =
822 rayon::join(
823 || {
824 rayon::join(
825 || {
826 postings::build_postings_streaming(
827 inverted_index,
828 term_interner,
829 &position_offsets,
830 &mut term_dict_writer,
831 &mut postings_writer,
832 )
833 },
834 || {
835 store::build_store_streaming(
836 &store_path,
837 num_compression_threads,
838 compression_level,
839 &mut store_writer,
840 num_docs,
841 )
842 },
843 )
844 },
845 || {
846 rayon::join(
847 || {
848 rayon::join(
849 || -> Result<()> {
850 if let Some(ref mut w) = vectors_writer {
851 dense::build_vectors_streaming(
852 dense_vectors,
853 schema,
854 trained,
855 w,
856 )?;
857 }
858 Ok(())
859 },
860 || -> Result<()> {
861 if let Some(ref mut w) = sparse_writer {
862 sparse::build_sparse_streaming(
863 &mut sparse_vectors,
864 schema,
865 w,
866 )?;
867 }
868 Ok(())
869 },
870 )
871 },
872 || -> Result<()> {
873 if let Some(ref mut w) = fast_writer {
874 build_fast_fields_streaming(&mut fast_fields, num_docs, w)?;
875 }
876 Ok(())
877 },
878 )
879 },
880 );
881 postings_result?;
882 store_result?;
883 vectors_result?;
884 sparse_result?;
885 fast_result?;
886
887 let term_dict_bytes = term_dict_writer.offset() as usize;
888 let postings_bytes = postings_writer.offset() as usize;
889 let store_bytes = store_writer.offset() as usize;
890 let vectors_bytes = vectors_writer.as_ref().map_or(0, |w| w.offset() as usize);
891 let sparse_bytes = sparse_writer.as_ref().map_or(0, |w| w.offset() as usize);
892 let fast_bytes = fast_writer.as_ref().map_or(0, |w| w.offset() as usize);
893
894 term_dict_writer.finish()?;
895 postings_writer.finish()?;
896 store_writer.finish()?;
897 if let Some(w) = vectors_writer {
898 w.finish()?;
899 }
900 if let Some(w) = sparse_writer {
901 w.finish()?;
902 }
903 if let Some(w) = fast_writer {
904 w.finish()?;
905 }
906 drop(position_offsets);
907 drop(sparse_vectors);
908
909 log::info!(
910 "[segment_build] {} docs: term_dict={}, postings={}, store={}, vectors={}, sparse={}, fast={}",
911 num_docs,
912 super::format_bytes(term_dict_bytes),
913 super::format_bytes(postings_bytes),
914 super::format_bytes(store_bytes),
915 super::format_bytes(vectors_bytes),
916 super::format_bytes(sparse_bytes),
917 super::format_bytes(fast_bytes),
918 );
919
920 let meta = SegmentMeta {
921 id: segment_id.0,
922 num_docs: self.next_doc_id,
923 field_stats: self.field_stats.clone(),
924 };
925
926 dir.write(&files.meta, &meta.serialize()?).await?;
927
928 let _ = std::fs::remove_file(&self.store_path);
930
931 Ok(meta)
932 }
933}
934
935fn build_fast_fields_streaming(
937 fast_fields: &mut FxHashMap<u32, crate::structures::fast_field::FastFieldWriter>,
938 num_docs: u32,
939 writer: &mut dyn Write,
940) -> Result<()> {
941 use crate::structures::fast_field::{FastFieldTocEntry, write_fast_field_toc_and_footer};
942
943 if fast_fields.is_empty() {
944 return Ok(());
945 }
946
947 let mut field_ids: Vec<u32> = fast_fields.keys().copied().collect();
949 field_ids.sort_unstable();
950
951 let mut toc_entries: Vec<FastFieldTocEntry> = Vec::with_capacity(field_ids.len());
952 let mut current_offset = 0u64;
953
954 for &field_id in &field_ids {
955 let ff = fast_fields.get_mut(&field_id).unwrap();
956 ff.pad_to(num_docs);
957
958 let (mut toc, bytes_written) = ff.serialize(writer, current_offset)?;
959 toc.field_id = field_id;
960 current_offset += bytes_written;
961 toc_entries.push(toc);
962 }
963
964 let toc_offset = current_offset;
966 write_fast_field_toc_and_footer(writer, toc_offset, &toc_entries)?;
967
968 Ok(())
969}
970
971impl Drop for SegmentBuilder {
972 fn drop(&mut self) {
973 let _ = std::fs::remove_file(&self.store_path);
975 }
976}