1pub(crate) mod bmp;
12mod config;
13mod dense;
14#[cfg(feature = "diagnostics")]
15mod diagnostics;
16mod postings;
17pub(crate) mod simhash;
18mod sparse;
19mod store;
20
21pub use config::{MemoryBreakdown, SegmentBuilderConfig, SegmentBuilderStats};
22
23use std::fs::{File, OpenOptions};
24use std::io::{BufWriter, Write};
25use std::mem::size_of;
26use std::path::PathBuf;
27
28use hashbrown::HashMap;
29use lasso::{Rodeo, Spur};
30use rustc_hash::FxHashMap;
31
32use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
33use std::sync::Arc;
34
35use crate::directories::{Directory, DirectoryWriter};
36use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
37use crate::tokenizer::BoxedTokenizer;
38use crate::{DocId, Result};
39
40use dense::DenseVectorBuilder;
41use postings::{CompactPosting, PositionPostingListBuilder, PostingListBuilder, TermKey};
42use sparse::SparseVectorBuilder;
43
44const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; const NEW_TERM_OVERHEAD: usize = size_of::<TermKey>() + size_of::<PostingListBuilder>() + 24;
50
51const INTERN_OVERHEAD: usize = size_of::<Spur>() + 2 * size_of::<usize>();
53
54const NEW_POS_TERM_OVERHEAD: usize =
56 size_of::<TermKey>() + size_of::<PositionPostingListBuilder>() + 24;
57
58pub struct SegmentBuilder {
65 schema: Arc<Schema>,
66 config: SegmentBuilderConfig,
67 tokenizers: FxHashMap<Field, BoxedTokenizer>,
68
69 term_interner: Rodeo,
71
72 inverted_index: HashMap<TermKey, PostingListBuilder>,
74
75 store_file: BufWriter<File>,
77 store_path: PathBuf,
78
79 next_doc_id: DocId,
81
82 field_stats: FxHashMap<u32, FieldStats>,
84
85 doc_field_lengths: Vec<u32>,
89 num_indexed_fields: usize,
90 field_to_slot: FxHashMap<u32, usize>,
91
92 local_tf_buffer: FxHashMap<Spur, u32>,
95
96 local_positions: FxHashMap<Spur, Vec<u32>>,
99
100 token_buffer: String,
102
103 numeric_buffer: String,
105
106 dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
109
110 sparse_vectors: FxHashMap<u32, SparseVectorBuilder>,
113
114 position_index: HashMap<TermKey, PositionPostingListBuilder>,
117
118 position_enabled_fields: FxHashMap<u32, Option<crate::dsl::PositionMode>>,
120
121 current_element_ordinal: FxHashMap<u32, u32>,
123
124 estimated_memory: usize,
126
127 doc_serialize_buffer: Vec<u8>,
129
130 fast_fields: FxHashMap<u32, crate::structures::fast_field::FastFieldWriter>,
132}
133
134impl SegmentBuilder {
135 pub fn new(schema: Arc<Schema>, config: SegmentBuilderConfig) -> Result<Self> {
137 let segment_id = uuid::Uuid::new_v4();
138 let store_path = config
139 .temp_dir
140 .join(format!("hermes_store_{}.tmp", segment_id));
141
142 let store_file = BufWriter::with_capacity(
143 STORE_BUFFER_SIZE,
144 OpenOptions::new()
145 .create(true)
146 .write(true)
147 .truncate(true)
148 .open(&store_path)?,
149 );
150
151 let registry = crate::tokenizer::TokenizerRegistry::new();
153 let mut num_indexed_fields = 0;
154 let mut field_to_slot = FxHashMap::default();
155 let mut position_enabled_fields = FxHashMap::default();
156 let mut tokenizers = FxHashMap::default();
157 for (field, entry) in schema.fields() {
158 if entry.indexed && matches!(entry.field_type, FieldType::Text) {
159 field_to_slot.insert(field.0, num_indexed_fields);
160 num_indexed_fields += 1;
161 if entry.positions.is_some() {
162 position_enabled_fields.insert(field.0, entry.positions);
163 }
164 if let Some(ref tok_name) = entry.tokenizer
165 && let Some(tokenizer) = registry.get(tok_name)
166 {
167 tokenizers.insert(field, tokenizer);
168 }
169 }
170 }
171
172 use crate::structures::fast_field::{FastFieldColumnType, FastFieldWriter};
174 let mut fast_fields = FxHashMap::default();
175 for (field, entry) in schema.fields() {
176 if entry.fast {
177 let writer = if entry.multi {
178 match entry.field_type {
179 FieldType::U64 => {
180 FastFieldWriter::new_numeric_multi(FastFieldColumnType::U64)
181 }
182 FieldType::I64 => {
183 FastFieldWriter::new_numeric_multi(FastFieldColumnType::I64)
184 }
185 FieldType::F64 => {
186 FastFieldWriter::new_numeric_multi(FastFieldColumnType::F64)
187 }
188 FieldType::Text => FastFieldWriter::new_text_multi(),
189 _ => continue,
190 }
191 } else {
192 match entry.field_type {
193 FieldType::U64 => FastFieldWriter::new_numeric(FastFieldColumnType::U64),
194 FieldType::I64 => FastFieldWriter::new_numeric(FastFieldColumnType::I64),
195 FieldType::F64 => FastFieldWriter::new_numeric(FastFieldColumnType::F64),
196 FieldType::Text => FastFieldWriter::new_text(),
197 _ => continue,
198 }
199 };
200 fast_fields.insert(field.0, writer);
201 }
202 }
203
204 for (field, entry) in schema.fields() {
206 if entry.simhash && entry.field_type == FieldType::SparseVector {
207 fast_fields
208 .entry(field.0)
209 .or_insert_with(|| FastFieldWriter::new_numeric(FastFieldColumnType::U64));
210 }
211 }
212
213 Ok(Self {
214 schema,
215 tokenizers,
216 term_interner: Rodeo::new(),
217 inverted_index: HashMap::with_capacity(config.posting_map_capacity),
218 store_file,
219 store_path,
220 next_doc_id: 0,
221 field_stats: FxHashMap::default(),
222 doc_field_lengths: Vec::new(),
223 num_indexed_fields,
224 field_to_slot,
225 local_tf_buffer: FxHashMap::default(),
226 local_positions: FxHashMap::default(),
227 token_buffer: String::with_capacity(64),
228 numeric_buffer: String::with_capacity(32),
229 config,
230 dense_vectors: FxHashMap::default(),
231 sparse_vectors: FxHashMap::default(),
232 position_index: HashMap::new(),
233 position_enabled_fields,
234 current_element_ordinal: FxHashMap::default(),
235 estimated_memory: 0,
236 doc_serialize_buffer: Vec::with_capacity(256),
237 fast_fields,
238 })
239 }
240
241 pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
242 self.tokenizers.insert(field, tokenizer);
243 }
244
245 fn next_element_ordinal(&mut self, field_id: u32) -> u32 {
248 let ordinal = *self.current_element_ordinal.get(&field_id).unwrap_or(&0);
249 *self.current_element_ordinal.entry(field_id).or_insert(0) += 1;
250 ordinal
251 }
252
253 pub fn num_docs(&self) -> u32 {
254 self.next_doc_id
255 }
256
257 #[inline]
259 pub fn estimated_memory_bytes(&self) -> usize {
260 self.estimated_memory
261 }
262
263 pub fn sparse_dim_count(&self) -> usize {
265 self.sparse_vectors.values().map(|b| b.postings.len()).sum()
266 }
267
268 pub fn stats(&self) -> SegmentBuilderStats {
270 use std::mem::size_of;
271
272 let postings_in_memory: usize =
273 self.inverted_index.values().map(|p| p.postings.len()).sum();
274
275 let compact_posting_size = size_of::<CompactPosting>();
277 let vec_overhead = size_of::<Vec<u8>>(); let term_key_size = size_of::<TermKey>();
279 let posting_builder_size = size_of::<PostingListBuilder>();
280 let spur_size = size_of::<lasso::Spur>();
281 let sparse_entry_size = size_of::<(DocId, u16, f32)>();
282
283 let hashmap_entry_base_overhead = 8usize;
286
287 let fxhashmap_entry_overhead = hashmap_entry_base_overhead;
289
290 let postings_bytes: usize = self
292 .inverted_index
293 .values()
294 .map(|p| p.postings.capacity() * compact_posting_size + vec_overhead)
295 .sum();
296
297 let index_overhead_bytes = self.inverted_index.len()
299 * (term_key_size + posting_builder_size + hashmap_entry_base_overhead);
300
301 let interner_arena_overhead = 2 * size_of::<usize>();
304 let avg_term_len = 8; let interner_bytes =
306 self.term_interner.len() * (avg_term_len + spur_size + interner_arena_overhead);
307
308 let field_lengths_bytes =
310 self.doc_field_lengths.capacity() * size_of::<u32>() + vec_overhead;
311
312 let mut dense_vectors_bytes: usize = 0;
314 let mut dense_vector_count: usize = 0;
315 let doc_id_ordinal_size = size_of::<(DocId, u16)>();
316 for b in self.dense_vectors.values() {
317 dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
318 + b.doc_ids.capacity() * doc_id_ordinal_size
319 + 2 * vec_overhead; dense_vector_count += b.doc_ids.len();
321 }
322
323 let local_tf_entry_size = spur_size + size_of::<u32>() + fxhashmap_entry_overhead;
325 let local_tf_buffer_bytes = self.local_tf_buffer.capacity() * local_tf_entry_size;
326
327 let mut sparse_vectors_bytes: usize = 0;
329 for builder in self.sparse_vectors.values() {
330 for postings in builder.postings.values() {
331 sparse_vectors_bytes += postings.capacity() * sparse_entry_size + vec_overhead;
332 }
333 let inner_entry_size = size_of::<u32>() + vec_overhead + fxhashmap_entry_overhead;
335 sparse_vectors_bytes += builder.postings.len() * inner_entry_size;
336 }
337 let outer_sparse_entry_size =
339 size_of::<u32>() + size_of::<SparseVectorBuilder>() + fxhashmap_entry_overhead;
340 sparse_vectors_bytes += self.sparse_vectors.len() * outer_sparse_entry_size;
341
342 let mut position_index_bytes: usize = 0;
344 for pos_builder in self.position_index.values() {
345 for (_, positions) in &pos_builder.postings {
346 position_index_bytes += positions.capacity() * size_of::<u32>() + vec_overhead;
347 }
348 let pos_entry_size = size_of::<DocId>() + vec_overhead;
350 position_index_bytes += pos_builder.postings.capacity() * pos_entry_size;
351 }
352 let pos_index_entry_size =
354 term_key_size + size_of::<PositionPostingListBuilder>() + hashmap_entry_base_overhead;
355 position_index_bytes += self.position_index.len() * pos_index_entry_size;
356
357 let estimated_memory_bytes = postings_bytes
358 + index_overhead_bytes
359 + interner_bytes
360 + field_lengths_bytes
361 + dense_vectors_bytes
362 + local_tf_buffer_bytes
363 + sparse_vectors_bytes
364 + position_index_bytes;
365
366 let memory_breakdown = MemoryBreakdown {
367 postings_bytes,
368 index_overhead_bytes,
369 interner_bytes,
370 field_lengths_bytes,
371 dense_vectors_bytes,
372 dense_vector_count,
373 sparse_vectors_bytes,
374 position_index_bytes,
375 };
376
377 SegmentBuilderStats {
378 num_docs: self.next_doc_id,
379 unique_terms: self.inverted_index.len(),
380 postings_in_memory,
381 interned_strings: self.term_interner.len(),
382 doc_field_lengths_size: self.doc_field_lengths.len(),
383 estimated_memory_bytes,
384 memory_breakdown,
385 }
386 }
387
388 pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
390 let doc_id = self.next_doc_id;
391 self.next_doc_id += 1;
392
393 let base_idx = self.doc_field_lengths.len();
395 self.doc_field_lengths
396 .resize(base_idx + self.num_indexed_fields, 0);
397 self.estimated_memory += self.num_indexed_fields * std::mem::size_of::<u32>();
398
399 self.current_element_ordinal.clear();
401
402 for (field, value) in doc.field_values() {
403 let Some(entry) = self.schema.get_field_entry(*field) else {
404 continue;
405 };
406
407 if !matches!(&entry.field_type, FieldType::DenseVector) && !entry.indexed && !entry.fast
410 {
411 continue;
412 }
413
414 match (&entry.field_type, value) {
415 (FieldType::Text, FieldValue::Text(text)) => {
416 if entry.indexed {
417 let element_ordinal = self.next_element_ordinal(field.0);
418 let token_count =
419 self.index_text_field(*field, doc_id, text, element_ordinal)?;
420
421 let stats = self.field_stats.entry(field.0).or_default();
422 stats.total_tokens += token_count as u64;
423 if element_ordinal == 0 {
424 stats.doc_count += 1;
425 }
426
427 if let Some(&slot) = self.field_to_slot.get(&field.0) {
428 self.doc_field_lengths[base_idx + slot] = token_count;
429 }
430 }
431
432 if let Some(ff) = self.fast_fields.get_mut(&field.0) {
434 ff.add_text(doc_id, text);
435 }
436 }
437 (FieldType::U64, FieldValue::U64(v)) => {
438 if entry.indexed {
439 self.index_numeric_field(*field, doc_id, *v)?;
440 }
441 if let Some(ff) = self.fast_fields.get_mut(&field.0) {
442 ff.add_u64(doc_id, *v);
443 }
444 }
445 (FieldType::I64, FieldValue::I64(v)) => {
446 if entry.indexed {
447 self.index_numeric_field(*field, doc_id, *v as u64)?;
448 }
449 if let Some(ff) = self.fast_fields.get_mut(&field.0) {
450 ff.add_i64(doc_id, *v);
451 }
452 }
453 (FieldType::F64, FieldValue::F64(v)) => {
454 if entry.indexed {
455 self.index_numeric_field(*field, doc_id, v.to_bits())?;
456 }
457 if let Some(ff) = self.fast_fields.get_mut(&field.0) {
458 ff.add_f64(doc_id, *v);
459 }
460 }
461 (FieldType::DenseVector, FieldValue::DenseVector(vec))
462 if entry.indexed || entry.stored =>
463 {
464 let ordinal = self.next_element_ordinal(field.0);
465 self.index_dense_vector_field(*field, doc_id, ordinal as u16, vec)?;
466 }
467 (FieldType::SparseVector, FieldValue::SparseVector(entries)) => {
468 let has_simhash = entry.simhash;
469 let ordinal = self.next_element_ordinal(field.0);
470 self.index_sparse_vector_field(*field, doc_id, ordinal as u16, entries)?;
471 if ordinal == 0
473 && has_simhash
474 && let Some(ff) = self.fast_fields.get_mut(&field.0)
475 {
476 ff.add_u64(doc_id, simhash::simhash_from_sparse_vector(entries));
477 }
478 }
479 _ => {}
480 }
481 }
482
483 self.write_document_to_store(&doc)?;
485
486 Ok(doc_id)
487 }
488
489 fn index_text_field(
498 &mut self,
499 field: Field,
500 doc_id: DocId,
501 text: &str,
502 element_ordinal: u32,
503 ) -> Result<u32> {
504 use crate::dsl::PositionMode;
505
506 let field_id = field.0;
507 let position_mode = self
508 .position_enabled_fields
509 .get(&field_id)
510 .copied()
511 .flatten();
512
513 self.local_tf_buffer.clear();
517 for v in self.local_positions.values_mut() {
519 v.clear();
520 }
521
522 let mut token_position = 0u32;
523
524 let custom_tokens = self.tokenizers.get(&field).map(|t| t.tokenize(text));
528
529 if let Some(tokens) = custom_tokens {
530 for token in &tokens {
532 let term_spur = if let Some(spur) = self.term_interner.get(&token.text) {
533 spur
534 } else {
535 let spur = self.term_interner.get_or_intern(&token.text);
536 self.estimated_memory += token.text.len() + INTERN_OVERHEAD;
537 spur
538 };
539 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
540
541 if let Some(mode) = position_mode {
542 let encoded_pos = match mode {
543 PositionMode::Ordinal => element_ordinal << 20,
544 PositionMode::TokenPosition => token.position,
545 PositionMode::Full => (element_ordinal << 20) | token.position,
546 };
547 self.local_positions
548 .entry(term_spur)
549 .or_default()
550 .push(encoded_pos);
551 }
552 }
553 token_position = tokens.len() as u32;
554 } else {
555 for word in text.split_whitespace() {
557 self.token_buffer.clear();
558 for c in word.chars() {
559 if c.is_alphanumeric() {
560 for lc in c.to_lowercase() {
561 self.token_buffer.push(lc);
562 }
563 }
564 }
565
566 if self.token_buffer.is_empty() {
567 continue;
568 }
569
570 let term_spur = if let Some(spur) = self.term_interner.get(&self.token_buffer) {
571 spur
572 } else {
573 let spur = self.term_interner.get_or_intern(&self.token_buffer);
574 self.estimated_memory += self.token_buffer.len() + INTERN_OVERHEAD;
575 spur
576 };
577 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
578
579 if let Some(mode) = position_mode {
580 let encoded_pos = match mode {
581 PositionMode::Ordinal => element_ordinal << 20,
582 PositionMode::TokenPosition => token_position,
583 PositionMode::Full => (element_ordinal << 20) | token_position,
584 };
585 self.local_positions
586 .entry(term_spur)
587 .or_default()
588 .push(encoded_pos);
589 }
590
591 token_position += 1;
592 }
593 }
594
595 for (&term_spur, &tf) in &self.local_tf_buffer {
598 let term_key = TermKey {
599 field: field_id,
600 term: term_spur,
601 };
602
603 match self.inverted_index.entry(term_key) {
604 hashbrown::hash_map::Entry::Occupied(mut o) => {
605 o.get_mut().add(doc_id, tf);
606 self.estimated_memory += size_of::<CompactPosting>();
607 }
608 hashbrown::hash_map::Entry::Vacant(v) => {
609 let mut posting = PostingListBuilder::new();
610 posting.add(doc_id, tf);
611 v.insert(posting);
612 self.estimated_memory += size_of::<CompactPosting>() + NEW_TERM_OVERHEAD;
613 }
614 }
615
616 if position_mode.is_some()
617 && let Some(positions) = self.local_positions.get(&term_spur)
618 {
619 match self.position_index.entry(term_key) {
620 hashbrown::hash_map::Entry::Occupied(mut o) => {
621 for &pos in positions {
622 o.get_mut().add_position(doc_id, pos);
623 }
624 self.estimated_memory += positions.len() * size_of::<u32>();
625 }
626 hashbrown::hash_map::Entry::Vacant(v) => {
627 let mut pos_posting = PositionPostingListBuilder::new();
628 for &pos in positions {
629 pos_posting.add_position(doc_id, pos);
630 }
631 self.estimated_memory +=
632 positions.len() * size_of::<u32>() + NEW_POS_TERM_OVERHEAD;
633 v.insert(pos_posting);
634 }
635 }
636 }
637 }
638
639 Ok(token_position)
640 }
641
642 fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
643 use std::fmt::Write;
644
645 self.numeric_buffer.clear();
646 write!(self.numeric_buffer, "__num_{}", value).unwrap();
647 let term_spur = if let Some(spur) = self.term_interner.get(&self.numeric_buffer) {
648 spur
649 } else {
650 let spur = self.term_interner.get_or_intern(&self.numeric_buffer);
651 self.estimated_memory += self.numeric_buffer.len() + INTERN_OVERHEAD;
652 spur
653 };
654
655 let term_key = TermKey {
656 field: field.0,
657 term: term_spur,
658 };
659
660 match self.inverted_index.entry(term_key) {
661 hashbrown::hash_map::Entry::Occupied(mut o) => {
662 o.get_mut().add(doc_id, 1);
663 self.estimated_memory += size_of::<CompactPosting>();
664 }
665 hashbrown::hash_map::Entry::Vacant(v) => {
666 let mut posting = PostingListBuilder::new();
667 posting.add(doc_id, 1);
668 v.insert(posting);
669 self.estimated_memory += size_of::<CompactPosting>() + NEW_TERM_OVERHEAD;
670 }
671 }
672
673 Ok(())
674 }
675
676 fn index_dense_vector_field(
678 &mut self,
679 field: Field,
680 doc_id: DocId,
681 ordinal: u16,
682 vector: &[f32],
683 ) -> Result<()> {
684 let dim = vector.len();
685
686 let builder = self
687 .dense_vectors
688 .entry(field.0)
689 .or_insert_with(|| DenseVectorBuilder::new(dim));
690
691 if builder.dim != dim && builder.len() > 0 {
693 return Err(crate::Error::Schema(format!(
694 "Dense vector dimension mismatch: expected {}, got {}",
695 builder.dim, dim
696 )));
697 }
698
699 builder.add(doc_id, ordinal, vector);
700
701 self.estimated_memory += std::mem::size_of_val(vector) + size_of::<(DocId, u16)>();
702
703 Ok(())
704 }
705
706 fn index_sparse_vector_field(
713 &mut self,
714 field: Field,
715 doc_id: DocId,
716 ordinal: u16,
717 entries: &[(u32, f32)],
718 ) -> Result<()> {
719 let weight_threshold = self
721 .schema
722 .get_field_entry(field)
723 .and_then(|entry| entry.sparse_vector_config.as_ref())
724 .map(|config| config.weight_threshold)
725 .unwrap_or(0.0);
726
727 let builder = self
728 .sparse_vectors
729 .entry(field.0)
730 .or_insert_with(SparseVectorBuilder::new);
731
732 builder.inc_vector_count();
733
734 for &(dim_id, weight) in entries {
735 if weight.abs() < weight_threshold {
737 continue;
738 }
739
740 let is_new_dim = !builder.postings.contains_key(&dim_id);
741 builder.add(dim_id, doc_id, ordinal, weight);
742 self.estimated_memory += size_of::<(DocId, u16, f32)>();
743 if is_new_dim {
744 self.estimated_memory += size_of::<u32>() + size_of::<Vec<(DocId, u16, f32)>>() + 8; }
747 }
748
749 Ok(())
750 }
751
752 fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
754 use byteorder::{LittleEndian, WriteBytesExt};
755
756 super::store::serialize_document_into(doc, &self.schema, &mut self.doc_serialize_buffer)?;
757
758 self.store_file
759 .write_u32::<LittleEndian>(self.doc_serialize_buffer.len() as u32)?;
760 self.store_file.write_all(&self.doc_serialize_buffer)?;
761
762 Ok(())
763 }
764
765 pub async fn build<D: Directory + DirectoryWriter>(
771 mut self,
772 dir: &D,
773 segment_id: SegmentId,
774 trained: Option<&super::TrainedVectorStructures>,
775 ) -> Result<SegmentMeta> {
776 self.store_file.flush()?;
778
779 let files = SegmentFiles::new(segment_id.0);
780
781 let position_index = std::mem::take(&mut self.position_index);
783 let position_offsets = if !position_index.is_empty() {
784 let mut pos_writer = dir.streaming_writer(&files.positions).await?;
785 let offsets = postings::build_positions_streaming(
786 position_index,
787 &self.term_interner,
788 &mut *pos_writer,
789 )?;
790 pos_writer.finish()?;
791 offsets
792 } else {
793 FxHashMap::default()
794 };
795
796 let inverted_index = std::mem::take(&mut self.inverted_index);
799 let term_interner = std::mem::replace(&mut self.term_interner, Rodeo::new());
800 let store_path = self.store_path.clone();
801 let num_compression_threads = self.config.num_compression_threads;
802 let compression_level = self.config.compression_level;
803 let dense_vectors = std::mem::take(&mut self.dense_vectors);
804 let mut sparse_vectors = std::mem::take(&mut self.sparse_vectors);
805 let schema = &self.schema;
806
807 let mut term_dict_writer =
810 super::OffsetWriter::new(dir.streaming_writer(&files.term_dict).await?);
811 let mut postings_writer =
812 super::OffsetWriter::new(dir.streaming_writer(&files.postings).await?);
813 let mut store_writer = super::OffsetWriter::new(dir.streaming_writer(&files.store).await?);
814 let mut vectors_writer = if !dense_vectors.is_empty() {
815 Some(super::OffsetWriter::new(
816 dir.streaming_writer(&files.vectors).await?,
817 ))
818 } else {
819 None
820 };
821 let mut sparse_writer = if !sparse_vectors.is_empty() {
822 Some(super::OffsetWriter::new(
823 dir.streaming_writer(&files.sparse).await?,
824 ))
825 } else {
826 None
827 };
828 let mut fast_fields = std::mem::take(&mut self.fast_fields);
829 let num_docs = self.next_doc_id;
830 let mut fast_writer = if !fast_fields.is_empty() {
831 Some(super::OffsetWriter::new(
832 dir.streaming_writer(&files.fast).await?,
833 ))
834 } else {
835 None
836 };
837
838 let ((postings_result, store_result), ((vectors_result, sparse_result), fast_result)) =
839 rayon::join(
840 || {
841 rayon::join(
842 || {
843 postings::build_postings_streaming(
844 inverted_index,
845 term_interner,
846 &position_offsets,
847 &mut term_dict_writer,
848 &mut postings_writer,
849 )
850 },
851 || {
852 store::build_store_streaming(
853 &store_path,
854 num_compression_threads,
855 compression_level,
856 &mut store_writer,
857 num_docs,
858 )
859 },
860 )
861 },
862 || {
863 rayon::join(
864 || {
865 rayon::join(
866 || -> Result<()> {
867 if let Some(ref mut w) = vectors_writer {
868 dense::build_vectors_streaming(
869 dense_vectors,
870 schema,
871 trained,
872 w,
873 )?;
874 }
875 Ok(())
876 },
877 || -> Result<()> {
878 if let Some(ref mut w) = sparse_writer {
879 sparse::build_sparse_streaming(
880 &mut sparse_vectors,
881 schema,
882 w,
883 )?;
884 }
885 Ok(())
886 },
887 )
888 },
889 || -> Result<()> {
890 if let Some(ref mut w) = fast_writer {
891 build_fast_fields_streaming(&mut fast_fields, num_docs, w)?;
892 }
893 Ok(())
894 },
895 )
896 },
897 );
898 postings_result?;
899 store_result?;
900 vectors_result?;
901 sparse_result?;
902 fast_result?;
903
904 let term_dict_bytes = term_dict_writer.offset() as usize;
905 let postings_bytes = postings_writer.offset() as usize;
906 let store_bytes = store_writer.offset() as usize;
907 let vectors_bytes = vectors_writer.as_ref().map_or(0, |w| w.offset() as usize);
908 let sparse_bytes = sparse_writer.as_ref().map_or(0, |w| w.offset() as usize);
909 let fast_bytes = fast_writer.as_ref().map_or(0, |w| w.offset() as usize);
910
911 term_dict_writer.finish()?;
912 postings_writer.finish()?;
913 store_writer.finish()?;
914 if let Some(w) = vectors_writer {
915 w.finish()?;
916 }
917 if let Some(w) = sparse_writer {
918 w.finish()?;
919 }
920 if let Some(w) = fast_writer {
921 w.finish()?;
922 }
923 drop(position_offsets);
924 drop(sparse_vectors);
925
926 log::info!(
927 "[segment_build] {} docs: term_dict={}, postings={}, store={}, vectors={}, sparse={}, fast={}",
928 num_docs,
929 super::format_bytes(term_dict_bytes),
930 super::format_bytes(postings_bytes),
931 super::format_bytes(store_bytes),
932 super::format_bytes(vectors_bytes),
933 super::format_bytes(sparse_bytes),
934 super::format_bytes(fast_bytes),
935 );
936
937 let meta = SegmentMeta {
938 id: segment_id.0,
939 num_docs: self.next_doc_id,
940 field_stats: self.field_stats.clone(),
941 };
942
943 dir.write(&files.meta, &meta.serialize()?).await?;
944
945 let _ = std::fs::remove_file(&self.store_path);
947
948 Ok(meta)
949 }
950}
951
952fn build_fast_fields_streaming(
954 fast_fields: &mut FxHashMap<u32, crate::structures::fast_field::FastFieldWriter>,
955 num_docs: u32,
956 writer: &mut dyn Write,
957) -> Result<()> {
958 use crate::structures::fast_field::{FastFieldTocEntry, write_fast_field_toc_and_footer};
959
960 if fast_fields.is_empty() {
961 return Ok(());
962 }
963
964 let mut field_ids: Vec<u32> = fast_fields.keys().copied().collect();
966 field_ids.sort_unstable();
967
968 let mut toc_entries: Vec<FastFieldTocEntry> = Vec::with_capacity(field_ids.len());
969 let mut current_offset = 0u64;
970
971 for &field_id in &field_ids {
972 let ff = fast_fields.get_mut(&field_id).unwrap();
973 ff.pad_to(num_docs);
974
975 let (mut toc, bytes_written) = ff.serialize(writer, current_offset)?;
976 toc.field_id = field_id;
977 current_offset += bytes_written;
978 toc_entries.push(toc);
979 }
980
981 let toc_offset = current_offset;
983 write_fast_field_toc_and_footer(writer, toc_offset, &toc_entries)?;
984
985 Ok(())
986}
987
988impl Drop for SegmentBuilder {
989 fn drop(&mut self) {
990 let _ = std::fs::remove_file(&self.store_path);
992 }
993}