1mod config;
12mod dense;
13#[cfg(feature = "diagnostics")]
14mod diagnostics;
15mod postings;
16mod sparse;
17mod store;
18
19pub use config::{MemoryBreakdown, SegmentBuilderConfig, SegmentBuilderStats};
20
21use std::fs::{File, OpenOptions};
22use std::io::{BufWriter, Write};
23use std::mem::size_of;
24use std::path::PathBuf;
25
26use hashbrown::HashMap;
27use lasso::{Rodeo, Spur};
28use rustc_hash::FxHashMap;
29
30use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
31use std::sync::Arc;
32
33use crate::directories::{Directory, DirectoryWriter};
34use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
35use crate::tokenizer::BoxedTokenizer;
36use crate::{DocId, Result};
37
38use dense::DenseVectorBuilder;
39use postings::{CompactPosting, PositionPostingListBuilder, PostingListBuilder, TermKey};
40use sparse::SparseVectorBuilder;
41
42const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; const NEW_TERM_OVERHEAD: usize = size_of::<TermKey>() + size_of::<PostingListBuilder>() + 24;
48
49const INTERN_OVERHEAD: usize = size_of::<Spur>() + 2 * size_of::<usize>();
51
52const NEW_POS_TERM_OVERHEAD: usize =
54 size_of::<TermKey>() + size_of::<PositionPostingListBuilder>() + 24;
55
56pub struct SegmentBuilder {
63 schema: Arc<Schema>,
64 config: SegmentBuilderConfig,
65 tokenizers: FxHashMap<Field, BoxedTokenizer>,
66
67 term_interner: Rodeo,
69
70 inverted_index: HashMap<TermKey, PostingListBuilder>,
72
73 store_file: BufWriter<File>,
75 store_path: PathBuf,
76
77 next_doc_id: DocId,
79
80 field_stats: FxHashMap<u32, FieldStats>,
82
83 doc_field_lengths: Vec<u32>,
87 num_indexed_fields: usize,
88 field_to_slot: FxHashMap<u32, usize>,
89
90 local_tf_buffer: FxHashMap<Spur, u32>,
93
94 local_positions: FxHashMap<Spur, Vec<u32>>,
97
98 token_buffer: String,
100
101 numeric_buffer: String,
103
104 dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
107
108 sparse_vectors: FxHashMap<u32, SparseVectorBuilder>,
111
112 position_index: HashMap<TermKey, PositionPostingListBuilder>,
115
116 position_enabled_fields: FxHashMap<u32, Option<crate::dsl::PositionMode>>,
118
119 current_element_ordinal: FxHashMap<u32, u32>,
121
122 estimated_memory: usize,
124
125 doc_serialize_buffer: Vec<u8>,
127
128 fast_fields: FxHashMap<u32, crate::structures::fast_field::FastFieldWriter>,
130}
131
132impl SegmentBuilder {
133 pub fn new(schema: Arc<Schema>, config: SegmentBuilderConfig) -> Result<Self> {
135 let segment_id = uuid::Uuid::new_v4();
136 let store_path = config
137 .temp_dir
138 .join(format!("hermes_store_{}.tmp", segment_id));
139
140 let store_file = BufWriter::with_capacity(
141 STORE_BUFFER_SIZE,
142 OpenOptions::new()
143 .create(true)
144 .write(true)
145 .truncate(true)
146 .open(&store_path)?,
147 );
148
149 let mut num_indexed_fields = 0;
152 let mut field_to_slot = FxHashMap::default();
153 let mut position_enabled_fields = FxHashMap::default();
154 for (field, entry) in schema.fields() {
155 if entry.indexed && matches!(entry.field_type, FieldType::Text) {
156 field_to_slot.insert(field.0, num_indexed_fields);
157 num_indexed_fields += 1;
158 if entry.positions.is_some() {
159 position_enabled_fields.insert(field.0, entry.positions);
160 }
161 }
162 }
163
164 use crate::structures::fast_field::{FastFieldColumnType, FastFieldWriter};
166 let mut fast_fields = FxHashMap::default();
167 for (field, entry) in schema.fields() {
168 if entry.fast {
169 let writer = if entry.multi {
170 match entry.field_type {
171 FieldType::U64 => {
172 FastFieldWriter::new_numeric_multi(FastFieldColumnType::U64)
173 }
174 FieldType::I64 => {
175 FastFieldWriter::new_numeric_multi(FastFieldColumnType::I64)
176 }
177 FieldType::F64 => {
178 FastFieldWriter::new_numeric_multi(FastFieldColumnType::F64)
179 }
180 FieldType::Text => FastFieldWriter::new_text_multi(),
181 _ => continue,
182 }
183 } else {
184 match entry.field_type {
185 FieldType::U64 => FastFieldWriter::new_numeric(FastFieldColumnType::U64),
186 FieldType::I64 => FastFieldWriter::new_numeric(FastFieldColumnType::I64),
187 FieldType::F64 => FastFieldWriter::new_numeric(FastFieldColumnType::F64),
188 FieldType::Text => FastFieldWriter::new_text(),
189 _ => continue,
190 }
191 };
192 fast_fields.insert(field.0, writer);
193 }
194 }
195
196 Ok(Self {
197 schema,
198 tokenizers: FxHashMap::default(),
199 term_interner: Rodeo::new(),
200 inverted_index: HashMap::with_capacity(config.posting_map_capacity),
201 store_file,
202 store_path,
203 next_doc_id: 0,
204 field_stats: FxHashMap::default(),
205 doc_field_lengths: Vec::new(),
206 num_indexed_fields,
207 field_to_slot,
208 local_tf_buffer: FxHashMap::default(),
209 local_positions: FxHashMap::default(),
210 token_buffer: String::with_capacity(64),
211 numeric_buffer: String::with_capacity(32),
212 config,
213 dense_vectors: FxHashMap::default(),
214 sparse_vectors: FxHashMap::default(),
215 position_index: HashMap::new(),
216 position_enabled_fields,
217 current_element_ordinal: FxHashMap::default(),
218 estimated_memory: 0,
219 doc_serialize_buffer: Vec::with_capacity(256),
220 fast_fields,
221 })
222 }
223
224 pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
225 self.tokenizers.insert(field, tokenizer);
226 }
227
228 fn next_element_ordinal(&mut self, field_id: u32) -> u32 {
231 let ordinal = *self.current_element_ordinal.get(&field_id).unwrap_or(&0);
232 *self.current_element_ordinal.entry(field_id).or_insert(0) += 1;
233 ordinal
234 }
235
236 pub fn num_docs(&self) -> u32 {
237 self.next_doc_id
238 }
239
240 #[inline]
242 pub fn estimated_memory_bytes(&self) -> usize {
243 self.estimated_memory
244 }
245
246 pub fn sparse_dim_count(&self) -> usize {
248 self.sparse_vectors.values().map(|b| b.postings.len()).sum()
249 }
250
251 pub fn stats(&self) -> SegmentBuilderStats {
253 use std::mem::size_of;
254
255 let postings_in_memory: usize =
256 self.inverted_index.values().map(|p| p.postings.len()).sum();
257
258 let compact_posting_size = size_of::<CompactPosting>();
260 let vec_overhead = size_of::<Vec<u8>>(); let term_key_size = size_of::<TermKey>();
262 let posting_builder_size = size_of::<PostingListBuilder>();
263 let spur_size = size_of::<lasso::Spur>();
264 let sparse_entry_size = size_of::<(DocId, u16, f32)>();
265
266 let hashmap_entry_base_overhead = 8usize;
269
270 let fxhashmap_entry_overhead = hashmap_entry_base_overhead;
272
273 let postings_bytes: usize = self
275 .inverted_index
276 .values()
277 .map(|p| p.postings.capacity() * compact_posting_size + vec_overhead)
278 .sum();
279
280 let index_overhead_bytes = self.inverted_index.len()
282 * (term_key_size + posting_builder_size + hashmap_entry_base_overhead);
283
284 let interner_arena_overhead = 2 * size_of::<usize>();
287 let avg_term_len = 8; let interner_bytes =
289 self.term_interner.len() * (avg_term_len + spur_size + interner_arena_overhead);
290
291 let field_lengths_bytes =
293 self.doc_field_lengths.capacity() * size_of::<u32>() + vec_overhead;
294
295 let mut dense_vectors_bytes: usize = 0;
297 let mut dense_vector_count: usize = 0;
298 let doc_id_ordinal_size = size_of::<(DocId, u16)>();
299 for b in self.dense_vectors.values() {
300 dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
301 + b.doc_ids.capacity() * doc_id_ordinal_size
302 + 2 * vec_overhead; dense_vector_count += b.doc_ids.len();
304 }
305
306 let local_tf_entry_size = spur_size + size_of::<u32>() + fxhashmap_entry_overhead;
308 let local_tf_buffer_bytes = self.local_tf_buffer.capacity() * local_tf_entry_size;
309
310 let mut sparse_vectors_bytes: usize = 0;
312 for builder in self.sparse_vectors.values() {
313 for postings in builder.postings.values() {
314 sparse_vectors_bytes += postings.capacity() * sparse_entry_size + vec_overhead;
315 }
316 let inner_entry_size = size_of::<u32>() + vec_overhead + fxhashmap_entry_overhead;
318 sparse_vectors_bytes += builder.postings.len() * inner_entry_size;
319 }
320 let outer_sparse_entry_size =
322 size_of::<u32>() + size_of::<SparseVectorBuilder>() + fxhashmap_entry_overhead;
323 sparse_vectors_bytes += self.sparse_vectors.len() * outer_sparse_entry_size;
324
325 let mut position_index_bytes: usize = 0;
327 for pos_builder in self.position_index.values() {
328 for (_, positions) in &pos_builder.postings {
329 position_index_bytes += positions.capacity() * size_of::<u32>() + vec_overhead;
330 }
331 let pos_entry_size = size_of::<DocId>() + vec_overhead;
333 position_index_bytes += pos_builder.postings.capacity() * pos_entry_size;
334 }
335 let pos_index_entry_size =
337 term_key_size + size_of::<PositionPostingListBuilder>() + hashmap_entry_base_overhead;
338 position_index_bytes += self.position_index.len() * pos_index_entry_size;
339
340 let estimated_memory_bytes = postings_bytes
341 + index_overhead_bytes
342 + interner_bytes
343 + field_lengths_bytes
344 + dense_vectors_bytes
345 + local_tf_buffer_bytes
346 + sparse_vectors_bytes
347 + position_index_bytes;
348
349 let memory_breakdown = MemoryBreakdown {
350 postings_bytes,
351 index_overhead_bytes,
352 interner_bytes,
353 field_lengths_bytes,
354 dense_vectors_bytes,
355 dense_vector_count,
356 sparse_vectors_bytes,
357 position_index_bytes,
358 };
359
360 SegmentBuilderStats {
361 num_docs: self.next_doc_id,
362 unique_terms: self.inverted_index.len(),
363 postings_in_memory,
364 interned_strings: self.term_interner.len(),
365 doc_field_lengths_size: self.doc_field_lengths.len(),
366 estimated_memory_bytes,
367 memory_breakdown,
368 }
369 }
370
371 pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
373 let doc_id = self.next_doc_id;
374 self.next_doc_id += 1;
375
376 let base_idx = self.doc_field_lengths.len();
378 self.doc_field_lengths
379 .resize(base_idx + self.num_indexed_fields, 0);
380 self.estimated_memory += self.num_indexed_fields * std::mem::size_of::<u32>();
381
382 self.current_element_ordinal.clear();
384
385 for (field, value) in doc.field_values() {
386 let Some(entry) = self.schema.get_field_entry(*field) else {
387 continue;
388 };
389
390 if !matches!(&entry.field_type, FieldType::DenseVector) && !entry.indexed && !entry.fast
393 {
394 continue;
395 }
396
397 match (&entry.field_type, value) {
398 (FieldType::Text, FieldValue::Text(text)) => {
399 if entry.indexed {
400 let element_ordinal = self.next_element_ordinal(field.0);
401 let token_count =
402 self.index_text_field(*field, doc_id, text, element_ordinal)?;
403
404 let stats = self.field_stats.entry(field.0).or_default();
405 stats.total_tokens += token_count as u64;
406 if element_ordinal == 0 {
407 stats.doc_count += 1;
408 }
409
410 if let Some(&slot) = self.field_to_slot.get(&field.0) {
411 self.doc_field_lengths[base_idx + slot] = token_count;
412 }
413 }
414
415 if let Some(ff) = self.fast_fields.get_mut(&field.0) {
417 ff.add_text(doc_id, text);
418 }
419 }
420 (FieldType::U64, FieldValue::U64(v)) => {
421 if entry.indexed {
422 self.index_numeric_field(*field, doc_id, *v)?;
423 }
424 if let Some(ff) = self.fast_fields.get_mut(&field.0) {
425 ff.add_u64(doc_id, *v);
426 }
427 }
428 (FieldType::I64, FieldValue::I64(v)) => {
429 if entry.indexed {
430 self.index_numeric_field(*field, doc_id, *v as u64)?;
431 }
432 if let Some(ff) = self.fast_fields.get_mut(&field.0) {
433 ff.add_i64(doc_id, *v);
434 }
435 }
436 (FieldType::F64, FieldValue::F64(v)) => {
437 if entry.indexed {
438 self.index_numeric_field(*field, doc_id, v.to_bits())?;
439 }
440 if let Some(ff) = self.fast_fields.get_mut(&field.0) {
441 ff.add_f64(doc_id, *v);
442 }
443 }
444 (FieldType::DenseVector, FieldValue::DenseVector(vec))
445 if entry.indexed || entry.stored =>
446 {
447 let ordinal = self.next_element_ordinal(field.0);
448 self.index_dense_vector_field(*field, doc_id, ordinal as u16, vec)?;
449 }
450 (FieldType::SparseVector, FieldValue::SparseVector(entries)) => {
451 let ordinal = self.next_element_ordinal(field.0);
452 self.index_sparse_vector_field(*field, doc_id, ordinal as u16, entries)?;
453 }
454 _ => {}
455 }
456 }
457
458 self.write_document_to_store(&doc)?;
460
461 Ok(doc_id)
462 }
463
464 fn index_text_field(
473 &mut self,
474 field: Field,
475 doc_id: DocId,
476 text: &str,
477 element_ordinal: u32,
478 ) -> Result<u32> {
479 use crate::dsl::PositionMode;
480
481 let field_id = field.0;
482 let position_mode = self
483 .position_enabled_fields
484 .get(&field_id)
485 .copied()
486 .flatten();
487
488 self.local_tf_buffer.clear();
492 for v in self.local_positions.values_mut() {
494 v.clear();
495 }
496
497 let mut token_position = 0u32;
498
499 let custom_tokens = self.tokenizers.get(&field).map(|t| t.tokenize(text));
503
504 if let Some(tokens) = custom_tokens {
505 for token in &tokens {
507 let is_new_string = !self.term_interner.contains(&token.text);
508 let term_spur = self.term_interner.get_or_intern(&token.text);
509 if is_new_string {
510 self.estimated_memory += token.text.len() + INTERN_OVERHEAD;
511 }
512 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
513
514 if let Some(mode) = position_mode {
515 let encoded_pos = match mode {
516 PositionMode::Ordinal => element_ordinal << 20,
517 PositionMode::TokenPosition => token.position,
518 PositionMode::Full => (element_ordinal << 20) | token.position,
519 };
520 self.local_positions
521 .entry(term_spur)
522 .or_default()
523 .push(encoded_pos);
524 }
525 }
526 token_position = tokens.len() as u32;
527 } else {
528 for word in text.split_whitespace() {
530 self.token_buffer.clear();
531 for c in word.chars() {
532 if c.is_alphanumeric() {
533 for lc in c.to_lowercase() {
534 self.token_buffer.push(lc);
535 }
536 }
537 }
538
539 if self.token_buffer.is_empty() {
540 continue;
541 }
542
543 let is_new_string = !self.term_interner.contains(&self.token_buffer);
544 let term_spur = self.term_interner.get_or_intern(&self.token_buffer);
545 if is_new_string {
546 self.estimated_memory += self.token_buffer.len() + INTERN_OVERHEAD;
547 }
548 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
549
550 if let Some(mode) = position_mode {
551 let encoded_pos = match mode {
552 PositionMode::Ordinal => element_ordinal << 20,
553 PositionMode::TokenPosition => token_position,
554 PositionMode::Full => (element_ordinal << 20) | token_position,
555 };
556 self.local_positions
557 .entry(term_spur)
558 .or_default()
559 .push(encoded_pos);
560 }
561
562 token_position += 1;
563 }
564 }
565
566 for (&term_spur, &tf) in &self.local_tf_buffer {
569 let term_key = TermKey {
570 field: field_id,
571 term: term_spur,
572 };
573
574 let is_new_term = !self.inverted_index.contains_key(&term_key);
575 let posting = self
576 .inverted_index
577 .entry(term_key)
578 .or_insert_with(PostingListBuilder::new);
579 posting.add(doc_id, tf);
580
581 self.estimated_memory += size_of::<CompactPosting>();
582 if is_new_term {
583 self.estimated_memory += NEW_TERM_OVERHEAD;
584 }
585
586 if position_mode.is_some()
587 && let Some(positions) = self.local_positions.get(&term_spur)
588 {
589 let is_new_pos_term = !self.position_index.contains_key(&term_key);
590 let pos_posting = self
591 .position_index
592 .entry(term_key)
593 .or_insert_with(PositionPostingListBuilder::new);
594 for &pos in positions {
595 pos_posting.add_position(doc_id, pos);
596 }
597 self.estimated_memory += positions.len() * size_of::<u32>();
598 if is_new_pos_term {
599 self.estimated_memory += NEW_POS_TERM_OVERHEAD;
600 }
601 }
602 }
603
604 Ok(token_position)
605 }
606
607 fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
608 use std::fmt::Write;
609
610 self.numeric_buffer.clear();
611 write!(self.numeric_buffer, "__num_{}", value).unwrap();
612 let is_new_string = !self.term_interner.contains(&self.numeric_buffer);
613 let term_spur = self.term_interner.get_or_intern(&self.numeric_buffer);
614
615 let term_key = TermKey {
616 field: field.0,
617 term: term_spur,
618 };
619
620 let is_new_term = !self.inverted_index.contains_key(&term_key);
621 let posting = self
622 .inverted_index
623 .entry(term_key)
624 .or_insert_with(PostingListBuilder::new);
625 posting.add(doc_id, 1);
626
627 self.estimated_memory += size_of::<CompactPosting>();
628 if is_new_term {
629 self.estimated_memory += NEW_TERM_OVERHEAD;
630 }
631 if is_new_string {
632 self.estimated_memory += self.numeric_buffer.len() + INTERN_OVERHEAD;
633 }
634
635 Ok(())
636 }
637
638 fn index_dense_vector_field(
640 &mut self,
641 field: Field,
642 doc_id: DocId,
643 ordinal: u16,
644 vector: &[f32],
645 ) -> Result<()> {
646 let dim = vector.len();
647
648 let builder = self
649 .dense_vectors
650 .entry(field.0)
651 .or_insert_with(|| DenseVectorBuilder::new(dim));
652
653 if builder.dim != dim && builder.len() > 0 {
655 return Err(crate::Error::Schema(format!(
656 "Dense vector dimension mismatch: expected {}, got {}",
657 builder.dim, dim
658 )));
659 }
660
661 builder.add(doc_id, ordinal, vector);
662
663 self.estimated_memory += std::mem::size_of_val(vector) + size_of::<(DocId, u16)>();
664
665 Ok(())
666 }
667
668 fn index_sparse_vector_field(
675 &mut self,
676 field: Field,
677 doc_id: DocId,
678 ordinal: u16,
679 entries: &[(u32, f32)],
680 ) -> Result<()> {
681 let weight_threshold = self
683 .schema
684 .get_field_entry(field)
685 .and_then(|entry| entry.sparse_vector_config.as_ref())
686 .map(|config| config.weight_threshold)
687 .unwrap_or(0.0);
688
689 let builder = self
690 .sparse_vectors
691 .entry(field.0)
692 .or_insert_with(SparseVectorBuilder::new);
693
694 builder.inc_vector_count();
695
696 for &(dim_id, weight) in entries {
697 if weight.abs() < weight_threshold {
699 continue;
700 }
701
702 let is_new_dim = !builder.postings.contains_key(&dim_id);
703 builder.add(dim_id, doc_id, ordinal, weight);
704 self.estimated_memory += size_of::<(DocId, u16, f32)>();
705 if is_new_dim {
706 self.estimated_memory += size_of::<u32>() + size_of::<Vec<(DocId, u16, f32)>>() + 8; }
709 }
710
711 Ok(())
712 }
713
714 fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
716 use byteorder::{LittleEndian, WriteBytesExt};
717
718 super::store::serialize_document_into(doc, &self.schema, &mut self.doc_serialize_buffer)?;
719
720 self.store_file
721 .write_u32::<LittleEndian>(self.doc_serialize_buffer.len() as u32)?;
722 self.store_file.write_all(&self.doc_serialize_buffer)?;
723
724 Ok(())
725 }
726
727 pub async fn build<D: Directory + DirectoryWriter>(
733 mut self,
734 dir: &D,
735 segment_id: SegmentId,
736 trained: Option<&super::TrainedVectorStructures>,
737 ) -> Result<SegmentMeta> {
738 self.store_file.flush()?;
740
741 let files = SegmentFiles::new(segment_id.0);
742
743 let position_index = std::mem::take(&mut self.position_index);
745 let position_offsets = if !position_index.is_empty() {
746 let mut pos_writer = dir.streaming_writer(&files.positions).await?;
747 let offsets = postings::build_positions_streaming(
748 position_index,
749 &self.term_interner,
750 &mut *pos_writer,
751 )?;
752 pos_writer.finish()?;
753 offsets
754 } else {
755 FxHashMap::default()
756 };
757
758 let inverted_index = std::mem::take(&mut self.inverted_index);
761 let term_interner = std::mem::replace(&mut self.term_interner, Rodeo::new());
762 let store_path = self.store_path.clone();
763 let num_compression_threads = self.config.num_compression_threads;
764 let compression_level = self.config.compression_level;
765 let dense_vectors = std::mem::take(&mut self.dense_vectors);
766 let mut sparse_vectors = std::mem::take(&mut self.sparse_vectors);
767 let schema = &self.schema;
768
769 let mut term_dict_writer =
772 super::OffsetWriter::new(dir.streaming_writer(&files.term_dict).await?);
773 let mut postings_writer =
774 super::OffsetWriter::new(dir.streaming_writer(&files.postings).await?);
775 let mut store_writer = super::OffsetWriter::new(dir.streaming_writer(&files.store).await?);
776 let mut vectors_writer = if !dense_vectors.is_empty() {
777 Some(super::OffsetWriter::new(
778 dir.streaming_writer(&files.vectors).await?,
779 ))
780 } else {
781 None
782 };
783 let mut sparse_writer = if !sparse_vectors.is_empty() {
784 Some(super::OffsetWriter::new(
785 dir.streaming_writer(&files.sparse).await?,
786 ))
787 } else {
788 None
789 };
790 let mut fast_fields = std::mem::take(&mut self.fast_fields);
791 let num_docs = self.next_doc_id;
792 let mut fast_writer = if !fast_fields.is_empty() {
793 Some(super::OffsetWriter::new(
794 dir.streaming_writer(&files.fast).await?,
795 ))
796 } else {
797 None
798 };
799
800 let ((postings_result, store_result), ((vectors_result, sparse_result), fast_result)) =
801 rayon::join(
802 || {
803 rayon::join(
804 || {
805 postings::build_postings_streaming(
806 inverted_index,
807 term_interner,
808 &position_offsets,
809 &mut term_dict_writer,
810 &mut postings_writer,
811 )
812 },
813 || {
814 store::build_store_streaming(
815 &store_path,
816 num_compression_threads,
817 compression_level,
818 &mut store_writer,
819 num_docs,
820 )
821 },
822 )
823 },
824 || {
825 rayon::join(
826 || {
827 rayon::join(
828 || -> Result<()> {
829 if let Some(ref mut w) = vectors_writer {
830 dense::build_vectors_streaming(
831 dense_vectors,
832 schema,
833 trained,
834 w,
835 )?;
836 }
837 Ok(())
838 },
839 || -> Result<()> {
840 if let Some(ref mut w) = sparse_writer {
841 sparse::build_sparse_streaming(
842 &mut sparse_vectors,
843 schema,
844 w,
845 )?;
846 }
847 Ok(())
848 },
849 )
850 },
851 || -> Result<()> {
852 if let Some(ref mut w) = fast_writer {
853 build_fast_fields_streaming(&mut fast_fields, num_docs, w)?;
854 }
855 Ok(())
856 },
857 )
858 },
859 );
860 postings_result?;
861 store_result?;
862 vectors_result?;
863 sparse_result?;
864 fast_result?;
865
866 let term_dict_bytes = term_dict_writer.offset() as usize;
867 let postings_bytes = postings_writer.offset() as usize;
868 let store_bytes = store_writer.offset() as usize;
869 let vectors_bytes = vectors_writer.as_ref().map_or(0, |w| w.offset() as usize);
870 let sparse_bytes = sparse_writer.as_ref().map_or(0, |w| w.offset() as usize);
871 let fast_bytes = fast_writer.as_ref().map_or(0, |w| w.offset() as usize);
872
873 term_dict_writer.finish()?;
874 postings_writer.finish()?;
875 store_writer.finish()?;
876 if let Some(w) = vectors_writer {
877 w.finish()?;
878 }
879 if let Some(w) = sparse_writer {
880 w.finish()?;
881 }
882 if let Some(w) = fast_writer {
883 w.finish()?;
884 }
885 drop(position_offsets);
886 drop(sparse_vectors);
887
888 log::info!(
889 "[segment_build] {} docs: term_dict={}, postings={}, store={}, vectors={}, sparse={}, fast={}",
890 num_docs,
891 super::format_bytes(term_dict_bytes),
892 super::format_bytes(postings_bytes),
893 super::format_bytes(store_bytes),
894 super::format_bytes(vectors_bytes),
895 super::format_bytes(sparse_bytes),
896 super::format_bytes(fast_bytes),
897 );
898
899 let meta = SegmentMeta {
900 id: segment_id.0,
901 num_docs: self.next_doc_id,
902 field_stats: self.field_stats.clone(),
903 };
904
905 dir.write(&files.meta, &meta.serialize()?).await?;
906
907 let _ = std::fs::remove_file(&self.store_path);
909
910 Ok(meta)
911 }
912}
913
914fn build_fast_fields_streaming(
916 fast_fields: &mut FxHashMap<u32, crate::structures::fast_field::FastFieldWriter>,
917 num_docs: u32,
918 writer: &mut dyn Write,
919) -> Result<()> {
920 use crate::structures::fast_field::{FastFieldTocEntry, write_fast_field_toc_and_footer};
921
922 if fast_fields.is_empty() {
923 return Ok(());
924 }
925
926 let mut field_ids: Vec<u32> = fast_fields.keys().copied().collect();
928 field_ids.sort_unstable();
929
930 let mut toc_entries: Vec<FastFieldTocEntry> = Vec::with_capacity(field_ids.len());
931 let mut current_offset = 0u64;
932
933 for &field_id in &field_ids {
934 let ff = fast_fields.get_mut(&field_id).unwrap();
935 ff.pad_to(num_docs);
936
937 let (mut toc, bytes_written) = ff.serialize(writer, current_offset)?;
938 toc.field_id = field_id;
939 current_offset += bytes_written;
940 toc_entries.push(toc);
941 }
942
943 let toc_offset = current_offset;
945 write_fast_field_toc_and_footer(writer, toc_offset, &toc_entries)?;
946
947 Ok(())
948}
949
950impl Drop for SegmentBuilder {
951 fn drop(&mut self) {
952 let _ = std::fs::remove_file(&self.store_path);
954 }
955}