1pub(crate) mod bmp;
12mod config;
13mod dense;
14#[cfg(feature = "diagnostics")]
15mod diagnostics;
16mod postings;
17mod sparse;
18mod store;
19
20pub use config::{MemoryBreakdown, SegmentBuilderConfig, SegmentBuilderStats};
21
22use std::fs::{File, OpenOptions};
23use std::io::{BufWriter, Write};
24use std::mem::size_of;
25use std::path::PathBuf;
26
27use hashbrown::HashMap;
28use lasso::{Rodeo, Spur};
29use rustc_hash::FxHashMap;
30
31use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
32use std::sync::Arc;
33
34use crate::directories::{Directory, DirectoryWriter};
35use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
36use crate::tokenizer::BoxedTokenizer;
37use crate::{DocId, Result};
38
39use dense::DenseVectorBuilder;
40use postings::{CompactPosting, PositionPostingListBuilder, PostingListBuilder, TermKey};
41use sparse::SparseVectorBuilder;
42
43const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; const NEW_TERM_OVERHEAD: usize = size_of::<TermKey>() + size_of::<PostingListBuilder>() + 24;
49
50const INTERN_OVERHEAD: usize = size_of::<Spur>() + 2 * size_of::<usize>();
52
53const NEW_POS_TERM_OVERHEAD: usize =
55 size_of::<TermKey>() + size_of::<PositionPostingListBuilder>() + 24;
56
57pub struct SegmentBuilder {
64 schema: Arc<Schema>,
65 config: SegmentBuilderConfig,
66 tokenizers: FxHashMap<Field, BoxedTokenizer>,
67
68 term_interner: Rodeo,
70
71 inverted_index: HashMap<TermKey, PostingListBuilder>,
73
74 store_file: BufWriter<File>,
76 store_path: PathBuf,
77
78 next_doc_id: DocId,
80
81 field_stats: FxHashMap<u32, FieldStats>,
83
84 doc_field_lengths: Vec<u32>,
88 num_indexed_fields: usize,
89 field_to_slot: FxHashMap<u32, usize>,
90
91 local_tf_buffer: FxHashMap<Spur, u32>,
94
95 local_positions: FxHashMap<Spur, Vec<u32>>,
98
99 token_buffer: String,
101
102 numeric_buffer: String,
104
105 dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
108
109 sparse_vectors: FxHashMap<u32, SparseVectorBuilder>,
112
113 position_index: HashMap<TermKey, PositionPostingListBuilder>,
116
117 position_enabled_fields: FxHashMap<u32, Option<crate::dsl::PositionMode>>,
119
120 current_element_ordinal: FxHashMap<u32, u32>,
122
123 estimated_memory: usize,
125
126 doc_serialize_buffer: Vec<u8>,
128
129 fast_fields: FxHashMap<u32, crate::structures::fast_field::FastFieldWriter>,
131}
132
133impl SegmentBuilder {
134 pub fn new(schema: Arc<Schema>, config: SegmentBuilderConfig) -> Result<Self> {
136 let segment_id = uuid::Uuid::new_v4();
137 let store_path = config
138 .temp_dir
139 .join(format!("hermes_store_{}.tmp", segment_id));
140
141 let store_file = BufWriter::with_capacity(
142 STORE_BUFFER_SIZE,
143 OpenOptions::new()
144 .create(true)
145 .write(true)
146 .truncate(true)
147 .open(&store_path)?,
148 );
149
150 let registry = crate::tokenizer::TokenizerRegistry::new();
152 let mut num_indexed_fields = 0;
153 let mut field_to_slot = FxHashMap::default();
154 let mut position_enabled_fields = FxHashMap::default();
155 let mut tokenizers = FxHashMap::default();
156 for (field, entry) in schema.fields() {
157 if entry.indexed && matches!(entry.field_type, FieldType::Text) {
158 field_to_slot.insert(field.0, num_indexed_fields);
159 num_indexed_fields += 1;
160 if entry.positions.is_some() {
161 position_enabled_fields.insert(field.0, entry.positions);
162 }
163 if let Some(ref tok_name) = entry.tokenizer
164 && let Some(tokenizer) = registry.get(tok_name)
165 {
166 tokenizers.insert(field, tokenizer);
167 }
168 }
169 }
170
171 use crate::structures::fast_field::{FastFieldColumnType, FastFieldWriter};
173 let mut fast_fields = FxHashMap::default();
174 for (field, entry) in schema.fields() {
175 if entry.fast {
176 let writer = if entry.multi {
177 match entry.field_type {
178 FieldType::U64 => {
179 FastFieldWriter::new_numeric_multi(FastFieldColumnType::U64)
180 }
181 FieldType::I64 => {
182 FastFieldWriter::new_numeric_multi(FastFieldColumnType::I64)
183 }
184 FieldType::F64 => {
185 FastFieldWriter::new_numeric_multi(FastFieldColumnType::F64)
186 }
187 FieldType::Text => FastFieldWriter::new_text_multi(),
188 _ => continue,
189 }
190 } else {
191 match entry.field_type {
192 FieldType::U64 => FastFieldWriter::new_numeric(FastFieldColumnType::U64),
193 FieldType::I64 => FastFieldWriter::new_numeric(FastFieldColumnType::I64),
194 FieldType::F64 => FastFieldWriter::new_numeric(FastFieldColumnType::F64),
195 FieldType::Text => FastFieldWriter::new_text(),
196 _ => continue,
197 }
198 };
199 fast_fields.insert(field.0, writer);
200 }
201 }
202
203 Ok(Self {
204 schema,
205 tokenizers,
206 term_interner: Rodeo::new(),
207 inverted_index: HashMap::with_capacity(config.posting_map_capacity),
208 store_file,
209 store_path,
210 next_doc_id: 0,
211 field_stats: FxHashMap::default(),
212 doc_field_lengths: Vec::new(),
213 num_indexed_fields,
214 field_to_slot,
215 local_tf_buffer: FxHashMap::default(),
216 local_positions: FxHashMap::default(),
217 token_buffer: String::with_capacity(64),
218 numeric_buffer: String::with_capacity(32),
219 config,
220 dense_vectors: FxHashMap::default(),
221 sparse_vectors: FxHashMap::default(),
222 position_index: HashMap::new(),
223 position_enabled_fields,
224 current_element_ordinal: FxHashMap::default(),
225 estimated_memory: 0,
226 doc_serialize_buffer: Vec::with_capacity(256),
227 fast_fields,
228 })
229 }
230
231 pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
232 self.tokenizers.insert(field, tokenizer);
233 }
234
235 fn next_element_ordinal(&mut self, field_id: u32) -> u32 {
238 let ordinal = *self.current_element_ordinal.get(&field_id).unwrap_or(&0);
239 *self.current_element_ordinal.entry(field_id).or_insert(0) += 1;
240 ordinal
241 }
242
243 pub fn num_docs(&self) -> u32 {
244 self.next_doc_id
245 }
246
247 #[inline]
249 pub fn estimated_memory_bytes(&self) -> usize {
250 self.estimated_memory
251 }
252
253 pub fn sparse_dim_count(&self) -> usize {
255 self.sparse_vectors.values().map(|b| b.postings.len()).sum()
256 }
257
258 pub fn stats(&self) -> SegmentBuilderStats {
260 use std::mem::size_of;
261
262 let postings_in_memory: usize =
263 self.inverted_index.values().map(|p| p.postings.len()).sum();
264
265 let compact_posting_size = size_of::<CompactPosting>();
267 let vec_overhead = size_of::<Vec<u8>>(); let term_key_size = size_of::<TermKey>();
269 let posting_builder_size = size_of::<PostingListBuilder>();
270 let spur_size = size_of::<lasso::Spur>();
271 let sparse_entry_size = size_of::<(DocId, u16, f32)>();
272
273 let hashmap_entry_base_overhead = 8usize;
276
277 let fxhashmap_entry_overhead = hashmap_entry_base_overhead;
279
280 let postings_bytes: usize = self
282 .inverted_index
283 .values()
284 .map(|p| p.postings.capacity() * compact_posting_size + vec_overhead)
285 .sum();
286
287 let index_overhead_bytes = self.inverted_index.len()
289 * (term_key_size + posting_builder_size + hashmap_entry_base_overhead);
290
291 let interner_arena_overhead = 2 * size_of::<usize>();
294 let avg_term_len = 8; let interner_bytes =
296 self.term_interner.len() * (avg_term_len + spur_size + interner_arena_overhead);
297
298 let field_lengths_bytes =
300 self.doc_field_lengths.capacity() * size_of::<u32>() + vec_overhead;
301
302 let mut dense_vectors_bytes: usize = 0;
304 let mut dense_vector_count: usize = 0;
305 let doc_id_ordinal_size = size_of::<(DocId, u16)>();
306 for b in self.dense_vectors.values() {
307 dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
308 + b.doc_ids.capacity() * doc_id_ordinal_size
309 + 2 * vec_overhead; dense_vector_count += b.doc_ids.len();
311 }
312
313 let local_tf_entry_size = spur_size + size_of::<u32>() + fxhashmap_entry_overhead;
315 let local_tf_buffer_bytes = self.local_tf_buffer.capacity() * local_tf_entry_size;
316
317 let mut sparse_vectors_bytes: usize = 0;
319 for builder in self.sparse_vectors.values() {
320 for postings in builder.postings.values() {
321 sparse_vectors_bytes += postings.capacity() * sparse_entry_size + vec_overhead;
322 }
323 let inner_entry_size = size_of::<u32>() + vec_overhead + fxhashmap_entry_overhead;
325 sparse_vectors_bytes += builder.postings.len() * inner_entry_size;
326 }
327 let outer_sparse_entry_size =
329 size_of::<u32>() + size_of::<SparseVectorBuilder>() + fxhashmap_entry_overhead;
330 sparse_vectors_bytes += self.sparse_vectors.len() * outer_sparse_entry_size;
331
332 let mut position_index_bytes: usize = 0;
334 for pos_builder in self.position_index.values() {
335 for (_, positions) in &pos_builder.postings {
336 position_index_bytes += positions.capacity() * size_of::<u32>() + vec_overhead;
337 }
338 let pos_entry_size = size_of::<DocId>() + vec_overhead;
340 position_index_bytes += pos_builder.postings.capacity() * pos_entry_size;
341 }
342 let pos_index_entry_size =
344 term_key_size + size_of::<PositionPostingListBuilder>() + hashmap_entry_base_overhead;
345 position_index_bytes += self.position_index.len() * pos_index_entry_size;
346
347 let estimated_memory_bytes = postings_bytes
348 + index_overhead_bytes
349 + interner_bytes
350 + field_lengths_bytes
351 + dense_vectors_bytes
352 + local_tf_buffer_bytes
353 + sparse_vectors_bytes
354 + position_index_bytes;
355
356 let memory_breakdown = MemoryBreakdown {
357 postings_bytes,
358 index_overhead_bytes,
359 interner_bytes,
360 field_lengths_bytes,
361 dense_vectors_bytes,
362 dense_vector_count,
363 sparse_vectors_bytes,
364 position_index_bytes,
365 };
366
367 SegmentBuilderStats {
368 num_docs: self.next_doc_id,
369 unique_terms: self.inverted_index.len(),
370 postings_in_memory,
371 interned_strings: self.term_interner.len(),
372 doc_field_lengths_size: self.doc_field_lengths.len(),
373 estimated_memory_bytes,
374 memory_breakdown,
375 }
376 }
377
378 pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
380 let doc_id = self.next_doc_id;
381 self.next_doc_id += 1;
382
383 let base_idx = self.doc_field_lengths.len();
385 self.doc_field_lengths
386 .resize(base_idx + self.num_indexed_fields, 0);
387 self.estimated_memory += self.num_indexed_fields * std::mem::size_of::<u32>();
388
389 self.current_element_ordinal.clear();
391
392 for (field, value) in doc.field_values() {
393 let Some(entry) = self.schema.get_field_entry(*field) else {
394 continue;
395 };
396
397 if !matches!(&entry.field_type, FieldType::DenseVector) && !entry.indexed && !entry.fast
400 {
401 continue;
402 }
403
404 match (&entry.field_type, value) {
405 (FieldType::Text, FieldValue::Text(text)) => {
406 if entry.indexed {
407 let element_ordinal = self.next_element_ordinal(field.0);
408 let token_count =
409 self.index_text_field(*field, doc_id, text, element_ordinal)?;
410
411 let stats = self.field_stats.entry(field.0).or_default();
412 stats.total_tokens += token_count as u64;
413 if element_ordinal == 0 {
414 stats.doc_count += 1;
415 }
416
417 if let Some(&slot) = self.field_to_slot.get(&field.0) {
418 self.doc_field_lengths[base_idx + slot] = token_count;
419 }
420 }
421
422 if let Some(ff) = self.fast_fields.get_mut(&field.0) {
424 ff.add_text(doc_id, text);
425 }
426 }
427 (FieldType::U64, FieldValue::U64(v)) => {
428 if entry.indexed {
429 self.index_numeric_field(*field, doc_id, *v)?;
430 }
431 if let Some(ff) = self.fast_fields.get_mut(&field.0) {
432 ff.add_u64(doc_id, *v);
433 }
434 }
435 (FieldType::I64, FieldValue::I64(v)) => {
436 if entry.indexed {
437 self.index_numeric_field(*field, doc_id, *v as u64)?;
438 }
439 if let Some(ff) = self.fast_fields.get_mut(&field.0) {
440 ff.add_i64(doc_id, *v);
441 }
442 }
443 (FieldType::F64, FieldValue::F64(v)) => {
444 if entry.indexed {
445 self.index_numeric_field(*field, doc_id, v.to_bits())?;
446 }
447 if let Some(ff) = self.fast_fields.get_mut(&field.0) {
448 ff.add_f64(doc_id, *v);
449 }
450 }
451 (FieldType::DenseVector, FieldValue::DenseVector(vec))
452 if entry.indexed || entry.stored =>
453 {
454 let ordinal = self.next_element_ordinal(field.0);
455 self.index_dense_vector_field(*field, doc_id, ordinal as u16, vec)?;
456 }
457 (FieldType::SparseVector, FieldValue::SparseVector(entries)) => {
458 let ordinal = self.next_element_ordinal(field.0);
459 self.index_sparse_vector_field(*field, doc_id, ordinal as u16, entries)?;
460 }
461 _ => {}
462 }
463 }
464
465 self.write_document_to_store(&doc)?;
467
468 Ok(doc_id)
469 }
470
471 fn index_text_field(
480 &mut self,
481 field: Field,
482 doc_id: DocId,
483 text: &str,
484 element_ordinal: u32,
485 ) -> Result<u32> {
486 use crate::dsl::PositionMode;
487
488 let field_id = field.0;
489 let position_mode = self
490 .position_enabled_fields
491 .get(&field_id)
492 .copied()
493 .flatten();
494
495 self.local_tf_buffer.clear();
499 for v in self.local_positions.values_mut() {
501 v.clear();
502 }
503
504 let mut token_position = 0u32;
505
506 let custom_tokens = self.tokenizers.get(&field).map(|t| t.tokenize(text));
510
511 if let Some(tokens) = custom_tokens {
512 for token in &tokens {
514 let term_spur = if let Some(spur) = self.term_interner.get(&token.text) {
515 spur
516 } else {
517 let spur = self.term_interner.get_or_intern(&token.text);
518 self.estimated_memory += token.text.len() + INTERN_OVERHEAD;
519 spur
520 };
521 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
522
523 if let Some(mode) = position_mode {
524 let encoded_pos = match mode {
525 PositionMode::Ordinal => element_ordinal << 20,
526 PositionMode::TokenPosition => token.position,
527 PositionMode::Full => (element_ordinal << 20) | token.position,
528 };
529 self.local_positions
530 .entry(term_spur)
531 .or_default()
532 .push(encoded_pos);
533 }
534 }
535 token_position = tokens.len() as u32;
536 } else {
537 for word in text.split_whitespace() {
539 self.token_buffer.clear();
540 for c in word.chars() {
541 if c.is_alphanumeric() {
542 for lc in c.to_lowercase() {
543 self.token_buffer.push(lc);
544 }
545 }
546 }
547
548 if self.token_buffer.is_empty() {
549 continue;
550 }
551
552 let term_spur = if let Some(spur) = self.term_interner.get(&self.token_buffer) {
553 spur
554 } else {
555 let spur = self.term_interner.get_or_intern(&self.token_buffer);
556 self.estimated_memory += self.token_buffer.len() + INTERN_OVERHEAD;
557 spur
558 };
559 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
560
561 if let Some(mode) = position_mode {
562 let encoded_pos = match mode {
563 PositionMode::Ordinal => element_ordinal << 20,
564 PositionMode::TokenPosition => token_position,
565 PositionMode::Full => (element_ordinal << 20) | token_position,
566 };
567 self.local_positions
568 .entry(term_spur)
569 .or_default()
570 .push(encoded_pos);
571 }
572
573 token_position += 1;
574 }
575 }
576
577 for (&term_spur, &tf) in &self.local_tf_buffer {
580 let term_key = TermKey {
581 field: field_id,
582 term: term_spur,
583 };
584
585 match self.inverted_index.entry(term_key) {
586 hashbrown::hash_map::Entry::Occupied(mut o) => {
587 o.get_mut().add(doc_id, tf);
588 self.estimated_memory += size_of::<CompactPosting>();
589 }
590 hashbrown::hash_map::Entry::Vacant(v) => {
591 let mut posting = PostingListBuilder::new();
592 posting.add(doc_id, tf);
593 v.insert(posting);
594 self.estimated_memory += size_of::<CompactPosting>() + NEW_TERM_OVERHEAD;
595 }
596 }
597
598 if position_mode.is_some()
599 && let Some(positions) = self.local_positions.get(&term_spur)
600 {
601 match self.position_index.entry(term_key) {
602 hashbrown::hash_map::Entry::Occupied(mut o) => {
603 for &pos in positions {
604 o.get_mut().add_position(doc_id, pos);
605 }
606 self.estimated_memory += positions.len() * size_of::<u32>();
607 }
608 hashbrown::hash_map::Entry::Vacant(v) => {
609 let mut pos_posting = PositionPostingListBuilder::new();
610 for &pos in positions {
611 pos_posting.add_position(doc_id, pos);
612 }
613 self.estimated_memory +=
614 positions.len() * size_of::<u32>() + NEW_POS_TERM_OVERHEAD;
615 v.insert(pos_posting);
616 }
617 }
618 }
619 }
620
621 Ok(token_position)
622 }
623
624 fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
625 use std::fmt::Write;
626
627 self.numeric_buffer.clear();
628 write!(self.numeric_buffer, "__num_{}", value).unwrap();
629 let term_spur = if let Some(spur) = self.term_interner.get(&self.numeric_buffer) {
630 spur
631 } else {
632 let spur = self.term_interner.get_or_intern(&self.numeric_buffer);
633 self.estimated_memory += self.numeric_buffer.len() + INTERN_OVERHEAD;
634 spur
635 };
636
637 let term_key = TermKey {
638 field: field.0,
639 term: term_spur,
640 };
641
642 match self.inverted_index.entry(term_key) {
643 hashbrown::hash_map::Entry::Occupied(mut o) => {
644 o.get_mut().add(doc_id, 1);
645 self.estimated_memory += size_of::<CompactPosting>();
646 }
647 hashbrown::hash_map::Entry::Vacant(v) => {
648 let mut posting = PostingListBuilder::new();
649 posting.add(doc_id, 1);
650 v.insert(posting);
651 self.estimated_memory += size_of::<CompactPosting>() + NEW_TERM_OVERHEAD;
652 }
653 }
654
655 Ok(())
656 }
657
658 fn index_dense_vector_field(
660 &mut self,
661 field: Field,
662 doc_id: DocId,
663 ordinal: u16,
664 vector: &[f32],
665 ) -> Result<()> {
666 let dim = vector.len();
667
668 let builder = self
669 .dense_vectors
670 .entry(field.0)
671 .or_insert_with(|| DenseVectorBuilder::new(dim));
672
673 if builder.dim != dim && builder.len() > 0 {
675 return Err(crate::Error::Schema(format!(
676 "Dense vector dimension mismatch: expected {}, got {}",
677 builder.dim, dim
678 )));
679 }
680
681 builder.add(doc_id, ordinal, vector);
682
683 self.estimated_memory += std::mem::size_of_val(vector) + size_of::<(DocId, u16)>();
684
685 Ok(())
686 }
687
688 fn index_sparse_vector_field(
695 &mut self,
696 field: Field,
697 doc_id: DocId,
698 ordinal: u16,
699 entries: &[(u32, f32)],
700 ) -> Result<()> {
701 let weight_threshold = self
703 .schema
704 .get_field_entry(field)
705 .and_then(|entry| entry.sparse_vector_config.as_ref())
706 .map(|config| config.weight_threshold)
707 .unwrap_or(0.0);
708
709 let builder = self
710 .sparse_vectors
711 .entry(field.0)
712 .or_insert_with(SparseVectorBuilder::new);
713
714 builder.inc_vector_count();
715
716 for &(dim_id, weight) in entries {
717 if weight.abs() < weight_threshold {
719 continue;
720 }
721
722 let is_new_dim = !builder.postings.contains_key(&dim_id);
723 builder.add(dim_id, doc_id, ordinal, weight);
724 self.estimated_memory += size_of::<(DocId, u16, f32)>();
725 if is_new_dim {
726 self.estimated_memory += size_of::<u32>() + size_of::<Vec<(DocId, u16, f32)>>() + 8; }
729 }
730
731 Ok(())
732 }
733
734 fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
736 use byteorder::{LittleEndian, WriteBytesExt};
737
738 super::store::serialize_document_into(doc, &self.schema, &mut self.doc_serialize_buffer)?;
739
740 self.store_file
741 .write_u32::<LittleEndian>(self.doc_serialize_buffer.len() as u32)?;
742 self.store_file.write_all(&self.doc_serialize_buffer)?;
743
744 Ok(())
745 }
746
747 pub async fn build<D: Directory + DirectoryWriter>(
753 mut self,
754 dir: &D,
755 segment_id: SegmentId,
756 trained: Option<&super::TrainedVectorStructures>,
757 ) -> Result<SegmentMeta> {
758 self.store_file.flush()?;
760
761 let files = SegmentFiles::new(segment_id.0);
762
763 let position_index = std::mem::take(&mut self.position_index);
765 let position_offsets = if !position_index.is_empty() {
766 let mut pos_writer = dir.streaming_writer(&files.positions).await?;
767 let offsets = postings::build_positions_streaming(
768 position_index,
769 &self.term_interner,
770 &mut *pos_writer,
771 )?;
772 pos_writer.finish()?;
773 offsets
774 } else {
775 FxHashMap::default()
776 };
777
778 let inverted_index = std::mem::take(&mut self.inverted_index);
781 let term_interner = std::mem::replace(&mut self.term_interner, Rodeo::new());
782 let store_path = self.store_path.clone();
783 let num_compression_threads = self.config.num_compression_threads;
784 let compression_level = self.config.compression_level;
785 let dense_vectors = std::mem::take(&mut self.dense_vectors);
786 let mut sparse_vectors = std::mem::take(&mut self.sparse_vectors);
787 let schema = &self.schema;
788
789 let mut term_dict_writer =
792 super::OffsetWriter::new(dir.streaming_writer(&files.term_dict).await?);
793 let mut postings_writer =
794 super::OffsetWriter::new(dir.streaming_writer(&files.postings).await?);
795 let mut store_writer = super::OffsetWriter::new(dir.streaming_writer(&files.store).await?);
796 let mut vectors_writer = if !dense_vectors.is_empty() {
797 Some(super::OffsetWriter::new(
798 dir.streaming_writer(&files.vectors).await?,
799 ))
800 } else {
801 None
802 };
803 let mut sparse_writer = if !sparse_vectors.is_empty() {
804 Some(super::OffsetWriter::new(
805 dir.streaming_writer(&files.sparse).await?,
806 ))
807 } else {
808 None
809 };
810 let mut fast_fields = std::mem::take(&mut self.fast_fields);
811 let num_docs = self.next_doc_id;
812 let mut fast_writer = if !fast_fields.is_empty() {
813 Some(super::OffsetWriter::new(
814 dir.streaming_writer(&files.fast).await?,
815 ))
816 } else {
817 None
818 };
819
820 let ((postings_result, store_result), ((vectors_result, sparse_result), fast_result)) =
821 rayon::join(
822 || {
823 rayon::join(
824 || {
825 postings::build_postings_streaming(
826 inverted_index,
827 term_interner,
828 &position_offsets,
829 &mut term_dict_writer,
830 &mut postings_writer,
831 )
832 },
833 || {
834 store::build_store_streaming(
835 &store_path,
836 num_compression_threads,
837 compression_level,
838 &mut store_writer,
839 num_docs,
840 )
841 },
842 )
843 },
844 || {
845 rayon::join(
846 || {
847 rayon::join(
848 || -> Result<()> {
849 if let Some(ref mut w) = vectors_writer {
850 dense::build_vectors_streaming(
851 dense_vectors,
852 schema,
853 trained,
854 w,
855 )?;
856 }
857 Ok(())
858 },
859 || -> Result<()> {
860 if let Some(ref mut w) = sparse_writer {
861 sparse::build_sparse_streaming(
862 &mut sparse_vectors,
863 schema,
864 w,
865 )?;
866 }
867 Ok(())
868 },
869 )
870 },
871 || -> Result<()> {
872 if let Some(ref mut w) = fast_writer {
873 build_fast_fields_streaming(&mut fast_fields, num_docs, w)?;
874 }
875 Ok(())
876 },
877 )
878 },
879 );
880 postings_result?;
881 store_result?;
882 vectors_result?;
883 sparse_result?;
884 fast_result?;
885
886 let term_dict_bytes = term_dict_writer.offset() as usize;
887 let postings_bytes = postings_writer.offset() as usize;
888 let store_bytes = store_writer.offset() as usize;
889 let vectors_bytes = vectors_writer.as_ref().map_or(0, |w| w.offset() as usize);
890 let sparse_bytes = sparse_writer.as_ref().map_or(0, |w| w.offset() as usize);
891 let fast_bytes = fast_writer.as_ref().map_or(0, |w| w.offset() as usize);
892
893 term_dict_writer.finish()?;
894 postings_writer.finish()?;
895 store_writer.finish()?;
896 if let Some(w) = vectors_writer {
897 w.finish()?;
898 }
899 if let Some(w) = sparse_writer {
900 w.finish()?;
901 }
902 if let Some(w) = fast_writer {
903 w.finish()?;
904 }
905 drop(position_offsets);
906 drop(sparse_vectors);
907
908 log::info!(
909 "[segment_build] {} docs: term_dict={}, postings={}, store={}, vectors={}, sparse={}, fast={}",
910 num_docs,
911 super::format_bytes(term_dict_bytes),
912 super::format_bytes(postings_bytes),
913 super::format_bytes(store_bytes),
914 super::format_bytes(vectors_bytes),
915 super::format_bytes(sparse_bytes),
916 super::format_bytes(fast_bytes),
917 );
918
919 let meta = SegmentMeta {
920 id: segment_id.0,
921 num_docs: self.next_doc_id,
922 field_stats: self.field_stats.clone(),
923 };
924
925 dir.write(&files.meta, &meta.serialize()?).await?;
926
927 let _ = std::fs::remove_file(&self.store_path);
929
930 Ok(meta)
931 }
932}
933
934fn build_fast_fields_streaming(
936 fast_fields: &mut FxHashMap<u32, crate::structures::fast_field::FastFieldWriter>,
937 num_docs: u32,
938 writer: &mut dyn Write,
939) -> Result<()> {
940 use crate::structures::fast_field::{FastFieldTocEntry, write_fast_field_toc_and_footer};
941
942 if fast_fields.is_empty() {
943 return Ok(());
944 }
945
946 let mut field_ids: Vec<u32> = fast_fields.keys().copied().collect();
948 field_ids.sort_unstable();
949
950 let mut toc_entries: Vec<FastFieldTocEntry> = Vec::with_capacity(field_ids.len());
951 let mut current_offset = 0u64;
952
953 for &field_id in &field_ids {
954 let ff = fast_fields.get_mut(&field_id).unwrap();
955 ff.pad_to(num_docs);
956
957 let (mut toc, bytes_written) = ff.serialize(writer, current_offset)?;
958 toc.field_id = field_id;
959 current_offset += bytes_written;
960 toc_entries.push(toc);
961 }
962
963 let toc_offset = current_offset;
965 write_fast_field_toc_and_footer(writer, toc_offset, &toc_entries)?;
966
967 Ok(())
968}
969
970impl Drop for SegmentBuilder {
971 fn drop(&mut self) {
972 let _ = std::fs::remove_file(&self.store_path);
974 }
975}