1mod config;
12mod dense;
13#[cfg(feature = "diagnostics")]
14mod diagnostics;
15mod postings;
16mod sparse;
17mod store;
18
19pub use config::{MemoryBreakdown, SegmentBuilderConfig, SegmentBuilderStats};
20
21use std::fs::{File, OpenOptions};
22use std::io::{BufWriter, Write};
23use std::mem::size_of;
24use std::path::PathBuf;
25
26use hashbrown::HashMap;
27use lasso::{Rodeo, Spur};
28use rustc_hash::FxHashMap;
29
30use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
31use std::sync::Arc;
32
33use crate::directories::{Directory, DirectoryWriter};
34use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
35use crate::tokenizer::BoxedTokenizer;
36use crate::{DocId, Result};
37
38use dense::DenseVectorBuilder;
39use postings::{CompactPosting, PositionPostingListBuilder, PostingListBuilder, TermKey};
40use sparse::SparseVectorBuilder;
41
42const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; const NEW_TERM_OVERHEAD: usize = size_of::<TermKey>() + size_of::<PostingListBuilder>() + 24;
48
49const INTERN_OVERHEAD: usize = size_of::<Spur>() + 2 * size_of::<usize>();
51
52const NEW_POS_TERM_OVERHEAD: usize =
54 size_of::<TermKey>() + size_of::<PositionPostingListBuilder>() + 24;
55
56pub struct SegmentBuilder {
63 schema: Arc<Schema>,
64 config: SegmentBuilderConfig,
65 tokenizers: FxHashMap<Field, BoxedTokenizer>,
66
67 term_interner: Rodeo,
69
70 inverted_index: HashMap<TermKey, PostingListBuilder>,
72
73 store_file: BufWriter<File>,
75 store_path: PathBuf,
76
77 next_doc_id: DocId,
79
80 field_stats: FxHashMap<u32, FieldStats>,
82
83 doc_field_lengths: Vec<u32>,
87 num_indexed_fields: usize,
88 field_to_slot: FxHashMap<u32, usize>,
89
90 local_tf_buffer: FxHashMap<Spur, u32>,
93
94 local_positions: FxHashMap<Spur, Vec<u32>>,
97
98 token_buffer: String,
100
101 numeric_buffer: String,
103
104 dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
107
108 sparse_vectors: FxHashMap<u32, SparseVectorBuilder>,
111
112 position_index: HashMap<TermKey, PositionPostingListBuilder>,
115
116 position_enabled_fields: FxHashMap<u32, Option<crate::dsl::PositionMode>>,
118
119 current_element_ordinal: FxHashMap<u32, u32>,
121
122 estimated_memory: usize,
124
125 doc_serialize_buffer: Vec<u8>,
127
128 fast_fields: FxHashMap<u32, crate::structures::fast_field::FastFieldWriter>,
130}
131
132impl SegmentBuilder {
133 pub fn new(schema: Arc<Schema>, config: SegmentBuilderConfig) -> Result<Self> {
135 let segment_id = uuid::Uuid::new_v4();
136 let store_path = config
137 .temp_dir
138 .join(format!("hermes_store_{}.tmp", segment_id));
139
140 let store_file = BufWriter::with_capacity(
141 STORE_BUFFER_SIZE,
142 OpenOptions::new()
143 .create(true)
144 .write(true)
145 .truncate(true)
146 .open(&store_path)?,
147 );
148
149 let registry = crate::tokenizer::TokenizerRegistry::new();
151 let mut num_indexed_fields = 0;
152 let mut field_to_slot = FxHashMap::default();
153 let mut position_enabled_fields = FxHashMap::default();
154 let mut tokenizers = FxHashMap::default();
155 for (field, entry) in schema.fields() {
156 if entry.indexed && matches!(entry.field_type, FieldType::Text) {
157 field_to_slot.insert(field.0, num_indexed_fields);
158 num_indexed_fields += 1;
159 if entry.positions.is_some() {
160 position_enabled_fields.insert(field.0, entry.positions);
161 }
162 if let Some(ref tok_name) = entry.tokenizer
163 && let Some(tokenizer) = registry.get(tok_name)
164 {
165 tokenizers.insert(field, tokenizer);
166 }
167 }
168 }
169
170 use crate::structures::fast_field::{FastFieldColumnType, FastFieldWriter};
172 let mut fast_fields = FxHashMap::default();
173 for (field, entry) in schema.fields() {
174 if entry.fast {
175 let writer = if entry.multi {
176 match entry.field_type {
177 FieldType::U64 => {
178 FastFieldWriter::new_numeric_multi(FastFieldColumnType::U64)
179 }
180 FieldType::I64 => {
181 FastFieldWriter::new_numeric_multi(FastFieldColumnType::I64)
182 }
183 FieldType::F64 => {
184 FastFieldWriter::new_numeric_multi(FastFieldColumnType::F64)
185 }
186 FieldType::Text => FastFieldWriter::new_text_multi(),
187 _ => continue,
188 }
189 } else {
190 match entry.field_type {
191 FieldType::U64 => FastFieldWriter::new_numeric(FastFieldColumnType::U64),
192 FieldType::I64 => FastFieldWriter::new_numeric(FastFieldColumnType::I64),
193 FieldType::F64 => FastFieldWriter::new_numeric(FastFieldColumnType::F64),
194 FieldType::Text => FastFieldWriter::new_text(),
195 _ => continue,
196 }
197 };
198 fast_fields.insert(field.0, writer);
199 }
200 }
201
202 Ok(Self {
203 schema,
204 tokenizers,
205 term_interner: Rodeo::new(),
206 inverted_index: HashMap::with_capacity(config.posting_map_capacity),
207 store_file,
208 store_path,
209 next_doc_id: 0,
210 field_stats: FxHashMap::default(),
211 doc_field_lengths: Vec::new(),
212 num_indexed_fields,
213 field_to_slot,
214 local_tf_buffer: FxHashMap::default(),
215 local_positions: FxHashMap::default(),
216 token_buffer: String::with_capacity(64),
217 numeric_buffer: String::with_capacity(32),
218 config,
219 dense_vectors: FxHashMap::default(),
220 sparse_vectors: FxHashMap::default(),
221 position_index: HashMap::new(),
222 position_enabled_fields,
223 current_element_ordinal: FxHashMap::default(),
224 estimated_memory: 0,
225 doc_serialize_buffer: Vec::with_capacity(256),
226 fast_fields,
227 })
228 }
229
230 pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
231 self.tokenizers.insert(field, tokenizer);
232 }
233
234 fn next_element_ordinal(&mut self, field_id: u32) -> u32 {
237 let ordinal = *self.current_element_ordinal.get(&field_id).unwrap_or(&0);
238 *self.current_element_ordinal.entry(field_id).or_insert(0) += 1;
239 ordinal
240 }
241
242 pub fn num_docs(&self) -> u32 {
243 self.next_doc_id
244 }
245
246 #[inline]
248 pub fn estimated_memory_bytes(&self) -> usize {
249 self.estimated_memory
250 }
251
252 pub fn sparse_dim_count(&self) -> usize {
254 self.sparse_vectors.values().map(|b| b.postings.len()).sum()
255 }
256
257 pub fn stats(&self) -> SegmentBuilderStats {
259 use std::mem::size_of;
260
261 let postings_in_memory: usize =
262 self.inverted_index.values().map(|p| p.postings.len()).sum();
263
264 let compact_posting_size = size_of::<CompactPosting>();
266 let vec_overhead = size_of::<Vec<u8>>(); let term_key_size = size_of::<TermKey>();
268 let posting_builder_size = size_of::<PostingListBuilder>();
269 let spur_size = size_of::<lasso::Spur>();
270 let sparse_entry_size = size_of::<(DocId, u16, f32)>();
271
272 let hashmap_entry_base_overhead = 8usize;
275
276 let fxhashmap_entry_overhead = hashmap_entry_base_overhead;
278
279 let postings_bytes: usize = self
281 .inverted_index
282 .values()
283 .map(|p| p.postings.capacity() * compact_posting_size + vec_overhead)
284 .sum();
285
286 let index_overhead_bytes = self.inverted_index.len()
288 * (term_key_size + posting_builder_size + hashmap_entry_base_overhead);
289
290 let interner_arena_overhead = 2 * size_of::<usize>();
293 let avg_term_len = 8; let interner_bytes =
295 self.term_interner.len() * (avg_term_len + spur_size + interner_arena_overhead);
296
297 let field_lengths_bytes =
299 self.doc_field_lengths.capacity() * size_of::<u32>() + vec_overhead;
300
301 let mut dense_vectors_bytes: usize = 0;
303 let mut dense_vector_count: usize = 0;
304 let doc_id_ordinal_size = size_of::<(DocId, u16)>();
305 for b in self.dense_vectors.values() {
306 dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
307 + b.doc_ids.capacity() * doc_id_ordinal_size
308 + 2 * vec_overhead; dense_vector_count += b.doc_ids.len();
310 }
311
312 let local_tf_entry_size = spur_size + size_of::<u32>() + fxhashmap_entry_overhead;
314 let local_tf_buffer_bytes = self.local_tf_buffer.capacity() * local_tf_entry_size;
315
316 let mut sparse_vectors_bytes: usize = 0;
318 for builder in self.sparse_vectors.values() {
319 for postings in builder.postings.values() {
320 sparse_vectors_bytes += postings.capacity() * sparse_entry_size + vec_overhead;
321 }
322 let inner_entry_size = size_of::<u32>() + vec_overhead + fxhashmap_entry_overhead;
324 sparse_vectors_bytes += builder.postings.len() * inner_entry_size;
325 }
326 let outer_sparse_entry_size =
328 size_of::<u32>() + size_of::<SparseVectorBuilder>() + fxhashmap_entry_overhead;
329 sparse_vectors_bytes += self.sparse_vectors.len() * outer_sparse_entry_size;
330
331 let mut position_index_bytes: usize = 0;
333 for pos_builder in self.position_index.values() {
334 for (_, positions) in &pos_builder.postings {
335 position_index_bytes += positions.capacity() * size_of::<u32>() + vec_overhead;
336 }
337 let pos_entry_size = size_of::<DocId>() + vec_overhead;
339 position_index_bytes += pos_builder.postings.capacity() * pos_entry_size;
340 }
341 let pos_index_entry_size =
343 term_key_size + size_of::<PositionPostingListBuilder>() + hashmap_entry_base_overhead;
344 position_index_bytes += self.position_index.len() * pos_index_entry_size;
345
346 let estimated_memory_bytes = postings_bytes
347 + index_overhead_bytes
348 + interner_bytes
349 + field_lengths_bytes
350 + dense_vectors_bytes
351 + local_tf_buffer_bytes
352 + sparse_vectors_bytes
353 + position_index_bytes;
354
355 let memory_breakdown = MemoryBreakdown {
356 postings_bytes,
357 index_overhead_bytes,
358 interner_bytes,
359 field_lengths_bytes,
360 dense_vectors_bytes,
361 dense_vector_count,
362 sparse_vectors_bytes,
363 position_index_bytes,
364 };
365
366 SegmentBuilderStats {
367 num_docs: self.next_doc_id,
368 unique_terms: self.inverted_index.len(),
369 postings_in_memory,
370 interned_strings: self.term_interner.len(),
371 doc_field_lengths_size: self.doc_field_lengths.len(),
372 estimated_memory_bytes,
373 memory_breakdown,
374 }
375 }
376
377 pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
379 let doc_id = self.next_doc_id;
380 self.next_doc_id += 1;
381
382 let base_idx = self.doc_field_lengths.len();
384 self.doc_field_lengths
385 .resize(base_idx + self.num_indexed_fields, 0);
386 self.estimated_memory += self.num_indexed_fields * std::mem::size_of::<u32>();
387
388 self.current_element_ordinal.clear();
390
391 for (field, value) in doc.field_values() {
392 let Some(entry) = self.schema.get_field_entry(*field) else {
393 continue;
394 };
395
396 if !matches!(&entry.field_type, FieldType::DenseVector) && !entry.indexed && !entry.fast
399 {
400 continue;
401 }
402
403 match (&entry.field_type, value) {
404 (FieldType::Text, FieldValue::Text(text)) => {
405 if entry.indexed {
406 let element_ordinal = self.next_element_ordinal(field.0);
407 let token_count =
408 self.index_text_field(*field, doc_id, text, element_ordinal)?;
409
410 let stats = self.field_stats.entry(field.0).or_default();
411 stats.total_tokens += token_count as u64;
412 if element_ordinal == 0 {
413 stats.doc_count += 1;
414 }
415
416 if let Some(&slot) = self.field_to_slot.get(&field.0) {
417 self.doc_field_lengths[base_idx + slot] = token_count;
418 }
419 }
420
421 if let Some(ff) = self.fast_fields.get_mut(&field.0) {
423 ff.add_text(doc_id, text);
424 }
425 }
426 (FieldType::U64, FieldValue::U64(v)) => {
427 if entry.indexed {
428 self.index_numeric_field(*field, doc_id, *v)?;
429 }
430 if let Some(ff) = self.fast_fields.get_mut(&field.0) {
431 ff.add_u64(doc_id, *v);
432 }
433 }
434 (FieldType::I64, FieldValue::I64(v)) => {
435 if entry.indexed {
436 self.index_numeric_field(*field, doc_id, *v as u64)?;
437 }
438 if let Some(ff) = self.fast_fields.get_mut(&field.0) {
439 ff.add_i64(doc_id, *v);
440 }
441 }
442 (FieldType::F64, FieldValue::F64(v)) => {
443 if entry.indexed {
444 self.index_numeric_field(*field, doc_id, v.to_bits())?;
445 }
446 if let Some(ff) = self.fast_fields.get_mut(&field.0) {
447 ff.add_f64(doc_id, *v);
448 }
449 }
450 (FieldType::DenseVector, FieldValue::DenseVector(vec))
451 if entry.indexed || entry.stored =>
452 {
453 let ordinal = self.next_element_ordinal(field.0);
454 self.index_dense_vector_field(*field, doc_id, ordinal as u16, vec)?;
455 }
456 (FieldType::SparseVector, FieldValue::SparseVector(entries)) => {
457 let ordinal = self.next_element_ordinal(field.0);
458 self.index_sparse_vector_field(*field, doc_id, ordinal as u16, entries)?;
459 }
460 _ => {}
461 }
462 }
463
464 self.write_document_to_store(&doc)?;
466
467 Ok(doc_id)
468 }
469
470 fn index_text_field(
479 &mut self,
480 field: Field,
481 doc_id: DocId,
482 text: &str,
483 element_ordinal: u32,
484 ) -> Result<u32> {
485 use crate::dsl::PositionMode;
486
487 let field_id = field.0;
488 let position_mode = self
489 .position_enabled_fields
490 .get(&field_id)
491 .copied()
492 .flatten();
493
494 self.local_tf_buffer.clear();
498 for v in self.local_positions.values_mut() {
500 v.clear();
501 }
502
503 let mut token_position = 0u32;
504
505 let custom_tokens = self.tokenizers.get(&field).map(|t| t.tokenize(text));
509
510 if let Some(tokens) = custom_tokens {
511 for token in &tokens {
513 let term_spur = if let Some(spur) = self.term_interner.get(&token.text) {
514 spur
515 } else {
516 let spur = self.term_interner.get_or_intern(&token.text);
517 self.estimated_memory += token.text.len() + INTERN_OVERHEAD;
518 spur
519 };
520 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
521
522 if let Some(mode) = position_mode {
523 let encoded_pos = match mode {
524 PositionMode::Ordinal => element_ordinal << 20,
525 PositionMode::TokenPosition => token.position,
526 PositionMode::Full => (element_ordinal << 20) | token.position,
527 };
528 self.local_positions
529 .entry(term_spur)
530 .or_default()
531 .push(encoded_pos);
532 }
533 }
534 token_position = tokens.len() as u32;
535 } else {
536 for word in text.split_whitespace() {
538 self.token_buffer.clear();
539 for c in word.chars() {
540 if c.is_alphanumeric() {
541 for lc in c.to_lowercase() {
542 self.token_buffer.push(lc);
543 }
544 }
545 }
546
547 if self.token_buffer.is_empty() {
548 continue;
549 }
550
551 let term_spur = if let Some(spur) = self.term_interner.get(&self.token_buffer) {
552 spur
553 } else {
554 let spur = self.term_interner.get_or_intern(&self.token_buffer);
555 self.estimated_memory += self.token_buffer.len() + INTERN_OVERHEAD;
556 spur
557 };
558 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
559
560 if let Some(mode) = position_mode {
561 let encoded_pos = match mode {
562 PositionMode::Ordinal => element_ordinal << 20,
563 PositionMode::TokenPosition => token_position,
564 PositionMode::Full => (element_ordinal << 20) | token_position,
565 };
566 self.local_positions
567 .entry(term_spur)
568 .or_default()
569 .push(encoded_pos);
570 }
571
572 token_position += 1;
573 }
574 }
575
576 for (&term_spur, &tf) in &self.local_tf_buffer {
579 let term_key = TermKey {
580 field: field_id,
581 term: term_spur,
582 };
583
584 match self.inverted_index.entry(term_key) {
585 hashbrown::hash_map::Entry::Occupied(mut o) => {
586 o.get_mut().add(doc_id, tf);
587 self.estimated_memory += size_of::<CompactPosting>();
588 }
589 hashbrown::hash_map::Entry::Vacant(v) => {
590 let mut posting = PostingListBuilder::new();
591 posting.add(doc_id, tf);
592 v.insert(posting);
593 self.estimated_memory += size_of::<CompactPosting>() + NEW_TERM_OVERHEAD;
594 }
595 }
596
597 if position_mode.is_some()
598 && let Some(positions) = self.local_positions.get(&term_spur)
599 {
600 match self.position_index.entry(term_key) {
601 hashbrown::hash_map::Entry::Occupied(mut o) => {
602 for &pos in positions {
603 o.get_mut().add_position(doc_id, pos);
604 }
605 self.estimated_memory += positions.len() * size_of::<u32>();
606 }
607 hashbrown::hash_map::Entry::Vacant(v) => {
608 let mut pos_posting = PositionPostingListBuilder::new();
609 for &pos in positions {
610 pos_posting.add_position(doc_id, pos);
611 }
612 self.estimated_memory +=
613 positions.len() * size_of::<u32>() + NEW_POS_TERM_OVERHEAD;
614 v.insert(pos_posting);
615 }
616 }
617 }
618 }
619
620 Ok(token_position)
621 }
622
623 fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
624 use std::fmt::Write;
625
626 self.numeric_buffer.clear();
627 write!(self.numeric_buffer, "__num_{}", value).unwrap();
628 let term_spur = if let Some(spur) = self.term_interner.get(&self.numeric_buffer) {
629 spur
630 } else {
631 let spur = self.term_interner.get_or_intern(&self.numeric_buffer);
632 self.estimated_memory += self.numeric_buffer.len() + INTERN_OVERHEAD;
633 spur
634 };
635
636 let term_key = TermKey {
637 field: field.0,
638 term: term_spur,
639 };
640
641 match self.inverted_index.entry(term_key) {
642 hashbrown::hash_map::Entry::Occupied(mut o) => {
643 o.get_mut().add(doc_id, 1);
644 self.estimated_memory += size_of::<CompactPosting>();
645 }
646 hashbrown::hash_map::Entry::Vacant(v) => {
647 let mut posting = PostingListBuilder::new();
648 posting.add(doc_id, 1);
649 v.insert(posting);
650 self.estimated_memory += size_of::<CompactPosting>() + NEW_TERM_OVERHEAD;
651 }
652 }
653
654 Ok(())
655 }
656
657 fn index_dense_vector_field(
659 &mut self,
660 field: Field,
661 doc_id: DocId,
662 ordinal: u16,
663 vector: &[f32],
664 ) -> Result<()> {
665 let dim = vector.len();
666
667 let builder = self
668 .dense_vectors
669 .entry(field.0)
670 .or_insert_with(|| DenseVectorBuilder::new(dim));
671
672 if builder.dim != dim && builder.len() > 0 {
674 return Err(crate::Error::Schema(format!(
675 "Dense vector dimension mismatch: expected {}, got {}",
676 builder.dim, dim
677 )));
678 }
679
680 builder.add(doc_id, ordinal, vector);
681
682 self.estimated_memory += std::mem::size_of_val(vector) + size_of::<(DocId, u16)>();
683
684 Ok(())
685 }
686
687 fn index_sparse_vector_field(
694 &mut self,
695 field: Field,
696 doc_id: DocId,
697 ordinal: u16,
698 entries: &[(u32, f32)],
699 ) -> Result<()> {
700 let weight_threshold = self
702 .schema
703 .get_field_entry(field)
704 .and_then(|entry| entry.sparse_vector_config.as_ref())
705 .map(|config| config.weight_threshold)
706 .unwrap_or(0.0);
707
708 let builder = self
709 .sparse_vectors
710 .entry(field.0)
711 .or_insert_with(SparseVectorBuilder::new);
712
713 builder.inc_vector_count();
714
715 for &(dim_id, weight) in entries {
716 if weight.abs() < weight_threshold {
718 continue;
719 }
720
721 let is_new_dim = !builder.postings.contains_key(&dim_id);
722 builder.add(dim_id, doc_id, ordinal, weight);
723 self.estimated_memory += size_of::<(DocId, u16, f32)>();
724 if is_new_dim {
725 self.estimated_memory += size_of::<u32>() + size_of::<Vec<(DocId, u16, f32)>>() + 8; }
728 }
729
730 Ok(())
731 }
732
733 fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
735 use byteorder::{LittleEndian, WriteBytesExt};
736
737 super::store::serialize_document_into(doc, &self.schema, &mut self.doc_serialize_buffer)?;
738
739 self.store_file
740 .write_u32::<LittleEndian>(self.doc_serialize_buffer.len() as u32)?;
741 self.store_file.write_all(&self.doc_serialize_buffer)?;
742
743 Ok(())
744 }
745
746 pub async fn build<D: Directory + DirectoryWriter>(
752 mut self,
753 dir: &D,
754 segment_id: SegmentId,
755 trained: Option<&super::TrainedVectorStructures>,
756 ) -> Result<SegmentMeta> {
757 self.store_file.flush()?;
759
760 let files = SegmentFiles::new(segment_id.0);
761
762 let position_index = std::mem::take(&mut self.position_index);
764 let position_offsets = if !position_index.is_empty() {
765 let mut pos_writer = dir.streaming_writer(&files.positions).await?;
766 let offsets = postings::build_positions_streaming(
767 position_index,
768 &self.term_interner,
769 &mut *pos_writer,
770 )?;
771 pos_writer.finish()?;
772 offsets
773 } else {
774 FxHashMap::default()
775 };
776
777 let inverted_index = std::mem::take(&mut self.inverted_index);
780 let term_interner = std::mem::replace(&mut self.term_interner, Rodeo::new());
781 let store_path = self.store_path.clone();
782 let num_compression_threads = self.config.num_compression_threads;
783 let compression_level = self.config.compression_level;
784 let dense_vectors = std::mem::take(&mut self.dense_vectors);
785 let mut sparse_vectors = std::mem::take(&mut self.sparse_vectors);
786 let schema = &self.schema;
787
788 let mut term_dict_writer =
791 super::OffsetWriter::new(dir.streaming_writer(&files.term_dict).await?);
792 let mut postings_writer =
793 super::OffsetWriter::new(dir.streaming_writer(&files.postings).await?);
794 let mut store_writer = super::OffsetWriter::new(dir.streaming_writer(&files.store).await?);
795 let mut vectors_writer = if !dense_vectors.is_empty() {
796 Some(super::OffsetWriter::new(
797 dir.streaming_writer(&files.vectors).await?,
798 ))
799 } else {
800 None
801 };
802 let mut sparse_writer = if !sparse_vectors.is_empty() {
803 Some(super::OffsetWriter::new(
804 dir.streaming_writer(&files.sparse).await?,
805 ))
806 } else {
807 None
808 };
809 let mut fast_fields = std::mem::take(&mut self.fast_fields);
810 let num_docs = self.next_doc_id;
811 let mut fast_writer = if !fast_fields.is_empty() {
812 Some(super::OffsetWriter::new(
813 dir.streaming_writer(&files.fast).await?,
814 ))
815 } else {
816 None
817 };
818
819 let ((postings_result, store_result), ((vectors_result, sparse_result), fast_result)) =
820 rayon::join(
821 || {
822 rayon::join(
823 || {
824 postings::build_postings_streaming(
825 inverted_index,
826 term_interner,
827 &position_offsets,
828 &mut term_dict_writer,
829 &mut postings_writer,
830 )
831 },
832 || {
833 store::build_store_streaming(
834 &store_path,
835 num_compression_threads,
836 compression_level,
837 &mut store_writer,
838 num_docs,
839 )
840 },
841 )
842 },
843 || {
844 rayon::join(
845 || {
846 rayon::join(
847 || -> Result<()> {
848 if let Some(ref mut w) = vectors_writer {
849 dense::build_vectors_streaming(
850 dense_vectors,
851 schema,
852 trained,
853 w,
854 )?;
855 }
856 Ok(())
857 },
858 || -> Result<()> {
859 if let Some(ref mut w) = sparse_writer {
860 sparse::build_sparse_streaming(
861 &mut sparse_vectors,
862 schema,
863 w,
864 )?;
865 }
866 Ok(())
867 },
868 )
869 },
870 || -> Result<()> {
871 if let Some(ref mut w) = fast_writer {
872 build_fast_fields_streaming(&mut fast_fields, num_docs, w)?;
873 }
874 Ok(())
875 },
876 )
877 },
878 );
879 postings_result?;
880 store_result?;
881 vectors_result?;
882 sparse_result?;
883 fast_result?;
884
885 let term_dict_bytes = term_dict_writer.offset() as usize;
886 let postings_bytes = postings_writer.offset() as usize;
887 let store_bytes = store_writer.offset() as usize;
888 let vectors_bytes = vectors_writer.as_ref().map_or(0, |w| w.offset() as usize);
889 let sparse_bytes = sparse_writer.as_ref().map_or(0, |w| w.offset() as usize);
890 let fast_bytes = fast_writer.as_ref().map_or(0, |w| w.offset() as usize);
891
892 term_dict_writer.finish()?;
893 postings_writer.finish()?;
894 store_writer.finish()?;
895 if let Some(w) = vectors_writer {
896 w.finish()?;
897 }
898 if let Some(w) = sparse_writer {
899 w.finish()?;
900 }
901 if let Some(w) = fast_writer {
902 w.finish()?;
903 }
904 drop(position_offsets);
905 drop(sparse_vectors);
906
907 log::info!(
908 "[segment_build] {} docs: term_dict={}, postings={}, store={}, vectors={}, sparse={}, fast={}",
909 num_docs,
910 super::format_bytes(term_dict_bytes),
911 super::format_bytes(postings_bytes),
912 super::format_bytes(store_bytes),
913 super::format_bytes(vectors_bytes),
914 super::format_bytes(sparse_bytes),
915 super::format_bytes(fast_bytes),
916 );
917
918 let meta = SegmentMeta {
919 id: segment_id.0,
920 num_docs: self.next_doc_id,
921 field_stats: self.field_stats.clone(),
922 };
923
924 dir.write(&files.meta, &meta.serialize()?).await?;
925
926 let _ = std::fs::remove_file(&self.store_path);
928
929 Ok(meta)
930 }
931}
932
933fn build_fast_fields_streaming(
935 fast_fields: &mut FxHashMap<u32, crate::structures::fast_field::FastFieldWriter>,
936 num_docs: u32,
937 writer: &mut dyn Write,
938) -> Result<()> {
939 use crate::structures::fast_field::{FastFieldTocEntry, write_fast_field_toc_and_footer};
940
941 if fast_fields.is_empty() {
942 return Ok(());
943 }
944
945 let mut field_ids: Vec<u32> = fast_fields.keys().copied().collect();
947 field_ids.sort_unstable();
948
949 let mut toc_entries: Vec<FastFieldTocEntry> = Vec::with_capacity(field_ids.len());
950 let mut current_offset = 0u64;
951
952 for &field_id in &field_ids {
953 let ff = fast_fields.get_mut(&field_id).unwrap();
954 ff.pad_to(num_docs);
955
956 let (mut toc, bytes_written) = ff.serialize(writer, current_offset)?;
957 toc.field_id = field_id;
958 current_offset += bytes_written;
959 toc_entries.push(toc);
960 }
961
962 let toc_offset = current_offset;
964 write_fast_field_toc_and_footer(writer, toc_offset, &toc_entries)?;
965
966 Ok(())
967}
968
969impl Drop for SegmentBuilder {
970 fn drop(&mut self) {
971 let _ = std::fs::remove_file(&self.store_path);
973 }
974}