1mod config;
12mod posting;
13mod vectors;
14
15pub use config::{MemoryBreakdown, SegmentBuilderConfig, SegmentBuilderStats};
16
17use std::fs::{File, OpenOptions};
18use std::io::{BufWriter, Write};
19use std::mem::size_of;
20use std::path::PathBuf;
21
22use hashbrown::HashMap;
23use lasso::{Rodeo, Spur};
24use rayon::prelude::*;
25use rustc_hash::FxHashMap;
26
27use crate::compression::CompressionLevel;
28
29use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
30use std::sync::Arc;
31
32use crate::directories::{Directory, DirectoryWriter};
33use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
34use crate::structures::{PostingList, SSTableWriter, TermInfo};
35use crate::tokenizer::BoxedTokenizer;
36use crate::{DocId, Result};
37
38use posting::{
39 CompactPosting, PositionPostingListBuilder, PostingListBuilder, SerializedPosting, TermKey,
40};
41use vectors::{DenseVectorBuilder, SparseVectorBuilder};
42
43use super::vector_data::FlatVectorData;
44
45const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; const NEW_TERM_OVERHEAD: usize = size_of::<TermKey>() + size_of::<PostingListBuilder>() + 24;
51
52const INTERN_OVERHEAD: usize = size_of::<Spur>() + 2 * size_of::<usize>();
54
55const NEW_POS_TERM_OVERHEAD: usize =
57 size_of::<TermKey>() + size_of::<PositionPostingListBuilder>() + 24;
58
59pub struct SegmentBuilder {
66 schema: Arc<Schema>,
67 config: SegmentBuilderConfig,
68 tokenizers: FxHashMap<Field, BoxedTokenizer>,
69
70 term_interner: Rodeo,
72
73 inverted_index: HashMap<TermKey, PostingListBuilder>,
75
76 store_file: BufWriter<File>,
78 store_path: PathBuf,
79
80 next_doc_id: DocId,
82
83 field_stats: FxHashMap<u32, FieldStats>,
85
86 doc_field_lengths: Vec<u32>,
90 num_indexed_fields: usize,
91 field_to_slot: FxHashMap<u32, usize>,
92
93 local_tf_buffer: FxHashMap<Spur, u32>,
96
97 local_positions: FxHashMap<Spur, Vec<u32>>,
100
101 token_buffer: String,
103
104 numeric_buffer: String,
106
107 dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
110
111 sparse_vectors: FxHashMap<u32, SparseVectorBuilder>,
114
115 position_index: HashMap<TermKey, PositionPostingListBuilder>,
118
119 position_enabled_fields: FxHashMap<u32, Option<crate::dsl::PositionMode>>,
121
122 current_element_ordinal: FxHashMap<u32, u32>,
124
125 estimated_memory: usize,
127}
128
129impl SegmentBuilder {
130 pub fn new(schema: Arc<Schema>, config: SegmentBuilderConfig) -> Result<Self> {
132 let segment_id = uuid::Uuid::new_v4();
133 let store_path = config
134 .temp_dir
135 .join(format!("hermes_store_{}.tmp", segment_id));
136
137 let store_file = BufWriter::with_capacity(
138 STORE_BUFFER_SIZE,
139 OpenOptions::new()
140 .create(true)
141 .write(true)
142 .truncate(true)
143 .open(&store_path)?,
144 );
145
146 let mut num_indexed_fields = 0;
149 let mut field_to_slot = FxHashMap::default();
150 let mut position_enabled_fields = FxHashMap::default();
151 for (field, entry) in schema.fields() {
152 if entry.indexed && matches!(entry.field_type, FieldType::Text) {
153 field_to_slot.insert(field.0, num_indexed_fields);
154 num_indexed_fields += 1;
155 if entry.positions.is_some() {
156 position_enabled_fields.insert(field.0, entry.positions);
157 }
158 }
159 }
160
161 Ok(Self {
162 schema,
163 tokenizers: FxHashMap::default(),
164 term_interner: Rodeo::new(),
165 inverted_index: HashMap::with_capacity(config.posting_map_capacity),
166 store_file,
167 store_path,
168 next_doc_id: 0,
169 field_stats: FxHashMap::default(),
170 doc_field_lengths: Vec::new(),
171 num_indexed_fields,
172 field_to_slot,
173 local_tf_buffer: FxHashMap::default(),
174 local_positions: FxHashMap::default(),
175 token_buffer: String::with_capacity(64),
176 numeric_buffer: String::with_capacity(32),
177 config,
178 dense_vectors: FxHashMap::default(),
179 sparse_vectors: FxHashMap::default(),
180 position_index: HashMap::new(),
181 position_enabled_fields,
182 current_element_ordinal: FxHashMap::default(),
183 estimated_memory: 0,
184 })
185 }
186
187 pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
188 self.tokenizers.insert(field, tokenizer);
189 }
190
191 fn next_element_ordinal(&mut self, field_id: u32) -> u32 {
194 let ordinal = *self.current_element_ordinal.get(&field_id).unwrap_or(&0);
195 *self.current_element_ordinal.entry(field_id).or_insert(0) += 1;
196 ordinal
197 }
198
199 pub fn num_docs(&self) -> u32 {
200 self.next_doc_id
201 }
202
203 #[inline]
205 pub fn estimated_memory_bytes(&self) -> usize {
206 self.estimated_memory
207 }
208
209 pub fn sparse_dim_count(&self) -> usize {
211 self.sparse_vectors.values().map(|b| b.postings.len()).sum()
212 }
213
214 pub fn stats(&self) -> SegmentBuilderStats {
216 use std::mem::size_of;
217
218 let postings_in_memory: usize =
219 self.inverted_index.values().map(|p| p.postings.len()).sum();
220
221 let compact_posting_size = size_of::<CompactPosting>();
223 let vec_overhead = size_of::<Vec<u8>>(); let term_key_size = size_of::<TermKey>();
225 let posting_builder_size = size_of::<PostingListBuilder>();
226 let spur_size = size_of::<lasso::Spur>();
227 let sparse_entry_size = size_of::<(DocId, u16, f32)>();
228
229 let hashmap_entry_base_overhead = 8usize;
232
233 let fxhashmap_entry_overhead = hashmap_entry_base_overhead;
235
236 let postings_bytes: usize = self
238 .inverted_index
239 .values()
240 .map(|p| p.postings.capacity() * compact_posting_size + vec_overhead)
241 .sum();
242
243 let index_overhead_bytes = self.inverted_index.len()
245 * (term_key_size + posting_builder_size + hashmap_entry_base_overhead);
246
247 let interner_arena_overhead = 2 * size_of::<usize>();
250 let avg_term_len = 8; let interner_bytes =
252 self.term_interner.len() * (avg_term_len + spur_size + interner_arena_overhead);
253
254 let field_lengths_bytes =
256 self.doc_field_lengths.capacity() * size_of::<u32>() + vec_overhead;
257
258 let mut dense_vectors_bytes: usize = 0;
260 let mut dense_vector_count: usize = 0;
261 let doc_id_ordinal_size = size_of::<(DocId, u16)>();
262 for b in self.dense_vectors.values() {
263 dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
264 + b.doc_ids.capacity() * doc_id_ordinal_size
265 + 2 * vec_overhead; dense_vector_count += b.doc_ids.len();
267 }
268
269 let local_tf_entry_size = spur_size + size_of::<u32>() + fxhashmap_entry_overhead;
271 let local_tf_buffer_bytes = self.local_tf_buffer.capacity() * local_tf_entry_size;
272
273 let mut sparse_vectors_bytes: usize = 0;
275 for builder in self.sparse_vectors.values() {
276 for postings in builder.postings.values() {
277 sparse_vectors_bytes += postings.capacity() * sparse_entry_size + vec_overhead;
278 }
279 let inner_entry_size = size_of::<u32>() + vec_overhead + fxhashmap_entry_overhead;
281 sparse_vectors_bytes += builder.postings.len() * inner_entry_size;
282 }
283 let outer_sparse_entry_size =
285 size_of::<u32>() + size_of::<SparseVectorBuilder>() + fxhashmap_entry_overhead;
286 sparse_vectors_bytes += self.sparse_vectors.len() * outer_sparse_entry_size;
287
288 let mut position_index_bytes: usize = 0;
290 for pos_builder in self.position_index.values() {
291 for (_, positions) in &pos_builder.postings {
292 position_index_bytes += positions.capacity() * size_of::<u32>() + vec_overhead;
293 }
294 let pos_entry_size = size_of::<DocId>() + vec_overhead;
296 position_index_bytes += pos_builder.postings.capacity() * pos_entry_size;
297 }
298 let pos_index_entry_size =
300 term_key_size + size_of::<PositionPostingListBuilder>() + hashmap_entry_base_overhead;
301 position_index_bytes += self.position_index.len() * pos_index_entry_size;
302
303 let estimated_memory_bytes = postings_bytes
304 + index_overhead_bytes
305 + interner_bytes
306 + field_lengths_bytes
307 + dense_vectors_bytes
308 + local_tf_buffer_bytes
309 + sparse_vectors_bytes
310 + position_index_bytes;
311
312 let memory_breakdown = MemoryBreakdown {
313 postings_bytes,
314 index_overhead_bytes,
315 interner_bytes,
316 field_lengths_bytes,
317 dense_vectors_bytes,
318 dense_vector_count,
319 sparse_vectors_bytes,
320 position_index_bytes,
321 };
322
323 SegmentBuilderStats {
324 num_docs: self.next_doc_id,
325 unique_terms: self.inverted_index.len(),
326 postings_in_memory,
327 interned_strings: self.term_interner.len(),
328 doc_field_lengths_size: self.doc_field_lengths.len(),
329 estimated_memory_bytes,
330 memory_breakdown,
331 }
332 }
333
334 pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
336 let doc_id = self.next_doc_id;
337 self.next_doc_id += 1;
338
339 let base_idx = self.doc_field_lengths.len();
341 self.doc_field_lengths
342 .resize(base_idx + self.num_indexed_fields, 0);
343 self.estimated_memory += self.num_indexed_fields * std::mem::size_of::<u32>();
344
345 self.current_element_ordinal.clear();
347
348 for (field, value) in doc.field_values() {
349 let Some(entry) = self.schema.get_field_entry(*field) else {
350 continue;
351 };
352
353 if !matches!(&entry.field_type, FieldType::DenseVector) && !entry.indexed {
356 continue;
357 }
358
359 match (&entry.field_type, value) {
360 (FieldType::Text, FieldValue::Text(text)) => {
361 let element_ordinal = self.next_element_ordinal(field.0);
362 let token_count =
363 self.index_text_field(*field, doc_id, text, element_ordinal)?;
364
365 let stats = self.field_stats.entry(field.0).or_default();
366 stats.total_tokens += token_count as u64;
367 if element_ordinal == 0 {
368 stats.doc_count += 1;
369 }
370
371 if let Some(&slot) = self.field_to_slot.get(&field.0) {
372 self.doc_field_lengths[base_idx + slot] = token_count;
373 }
374 }
375 (FieldType::U64, FieldValue::U64(v)) => {
376 self.index_numeric_field(*field, doc_id, *v)?;
377 }
378 (FieldType::I64, FieldValue::I64(v)) => {
379 self.index_numeric_field(*field, doc_id, *v as u64)?;
380 }
381 (FieldType::F64, FieldValue::F64(v)) => {
382 self.index_numeric_field(*field, doc_id, v.to_bits())?;
383 }
384 (FieldType::DenseVector, FieldValue::DenseVector(vec))
385 if entry.indexed || entry.stored =>
386 {
387 let ordinal = self.next_element_ordinal(field.0);
388 self.index_dense_vector_field(*field, doc_id, ordinal as u16, vec)?;
389 }
390 (FieldType::SparseVector, FieldValue::SparseVector(entries)) => {
391 let ordinal = self.next_element_ordinal(field.0);
392 self.index_sparse_vector_field(*field, doc_id, ordinal as u16, entries)?;
393 }
394 _ => {}
395 }
396 }
397
398 self.write_document_to_store(&doc)?;
400
401 Ok(doc_id)
402 }
403
404 fn index_text_field(
413 &mut self,
414 field: Field,
415 doc_id: DocId,
416 text: &str,
417 element_ordinal: u32,
418 ) -> Result<u32> {
419 use crate::dsl::PositionMode;
420
421 let field_id = field.0;
422 let position_mode = self
423 .position_enabled_fields
424 .get(&field_id)
425 .copied()
426 .flatten();
427
428 self.local_tf_buffer.clear();
432 for v in self.local_positions.values_mut() {
434 v.clear();
435 }
436
437 let mut token_position = 0u32;
438
439 let custom_tokens = self.tokenizers.get(&field).map(|t| t.tokenize(text));
443
444 if let Some(tokens) = custom_tokens {
445 for token in &tokens {
447 let is_new_string = !self.term_interner.contains(&token.text);
448 let term_spur = self.term_interner.get_or_intern(&token.text);
449 if is_new_string {
450 self.estimated_memory += token.text.len() + INTERN_OVERHEAD;
451 }
452 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
453
454 if let Some(mode) = position_mode {
455 let encoded_pos = match mode {
456 PositionMode::Ordinal => element_ordinal << 20,
457 PositionMode::TokenPosition => token.position,
458 PositionMode::Full => (element_ordinal << 20) | token.position,
459 };
460 self.local_positions
461 .entry(term_spur)
462 .or_default()
463 .push(encoded_pos);
464 }
465 }
466 token_position = tokens.len() as u32;
467 } else {
468 for word in text.split_whitespace() {
470 self.token_buffer.clear();
471 for c in word.chars() {
472 if c.is_alphanumeric() {
473 for lc in c.to_lowercase() {
474 self.token_buffer.push(lc);
475 }
476 }
477 }
478
479 if self.token_buffer.is_empty() {
480 continue;
481 }
482
483 let is_new_string = !self.term_interner.contains(&self.token_buffer);
484 let term_spur = self.term_interner.get_or_intern(&self.token_buffer);
485 if is_new_string {
486 self.estimated_memory += self.token_buffer.len() + INTERN_OVERHEAD;
487 }
488 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
489
490 if let Some(mode) = position_mode {
491 let encoded_pos = match mode {
492 PositionMode::Ordinal => element_ordinal << 20,
493 PositionMode::TokenPosition => token_position,
494 PositionMode::Full => (element_ordinal << 20) | token_position,
495 };
496 self.local_positions
497 .entry(term_spur)
498 .or_default()
499 .push(encoded_pos);
500 }
501
502 token_position += 1;
503 }
504 }
505
506 for (&term_spur, &tf) in &self.local_tf_buffer {
509 let term_key = TermKey {
510 field: field_id,
511 term: term_spur,
512 };
513
514 let is_new_term = !self.inverted_index.contains_key(&term_key);
515 let posting = self
516 .inverted_index
517 .entry(term_key)
518 .or_insert_with(PostingListBuilder::new);
519 posting.add(doc_id, tf);
520
521 self.estimated_memory += size_of::<CompactPosting>();
522 if is_new_term {
523 self.estimated_memory += NEW_TERM_OVERHEAD;
524 }
525
526 if position_mode.is_some()
527 && let Some(positions) = self.local_positions.get(&term_spur)
528 {
529 let is_new_pos_term = !self.position_index.contains_key(&term_key);
530 let pos_posting = self
531 .position_index
532 .entry(term_key)
533 .or_insert_with(PositionPostingListBuilder::new);
534 for &pos in positions {
535 pos_posting.add_position(doc_id, pos);
536 }
537 self.estimated_memory += positions.len() * size_of::<u32>();
538 if is_new_pos_term {
539 self.estimated_memory += NEW_POS_TERM_OVERHEAD;
540 }
541 }
542 }
543
544 Ok(token_position)
545 }
546
547 fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
548 use std::fmt::Write;
549
550 self.numeric_buffer.clear();
551 write!(self.numeric_buffer, "__num_{}", value).unwrap();
552 let is_new_string = !self.term_interner.contains(&self.numeric_buffer);
553 let term_spur = self.term_interner.get_or_intern(&self.numeric_buffer);
554
555 let term_key = TermKey {
556 field: field.0,
557 term: term_spur,
558 };
559
560 let is_new_term = !self.inverted_index.contains_key(&term_key);
561 let posting = self
562 .inverted_index
563 .entry(term_key)
564 .or_insert_with(PostingListBuilder::new);
565 posting.add(doc_id, 1);
566
567 self.estimated_memory += size_of::<CompactPosting>();
568 if is_new_term {
569 self.estimated_memory += NEW_TERM_OVERHEAD;
570 }
571 if is_new_string {
572 self.estimated_memory += self.numeric_buffer.len() + INTERN_OVERHEAD;
573 }
574
575 Ok(())
576 }
577
578 fn index_dense_vector_field(
580 &mut self,
581 field: Field,
582 doc_id: DocId,
583 ordinal: u16,
584 vector: &[f32],
585 ) -> Result<()> {
586 let dim = vector.len();
587
588 let builder = self
589 .dense_vectors
590 .entry(field.0)
591 .or_insert_with(|| DenseVectorBuilder::new(dim));
592
593 if builder.dim != dim && builder.len() > 0 {
595 return Err(crate::Error::Schema(format!(
596 "Dense vector dimension mismatch: expected {}, got {}",
597 builder.dim, dim
598 )));
599 }
600
601 builder.add(doc_id, ordinal, vector);
602
603 self.estimated_memory += std::mem::size_of_val(vector) + size_of::<(DocId, u16)>();
604
605 Ok(())
606 }
607
608 fn index_sparse_vector_field(
615 &mut self,
616 field: Field,
617 doc_id: DocId,
618 ordinal: u16,
619 entries: &[(u32, f32)],
620 ) -> Result<()> {
621 let weight_threshold = self
623 .schema
624 .get_field_entry(field)
625 .and_then(|entry| entry.sparse_vector_config.as_ref())
626 .map(|config| config.weight_threshold)
627 .unwrap_or(0.0);
628
629 let builder = self
630 .sparse_vectors
631 .entry(field.0)
632 .or_insert_with(SparseVectorBuilder::new);
633
634 for &(dim_id, weight) in entries {
635 if weight.abs() < weight_threshold {
637 continue;
638 }
639
640 let is_new_dim = !builder.postings.contains_key(&dim_id);
641 builder.add(dim_id, doc_id, ordinal, weight);
642 self.estimated_memory += size_of::<(DocId, u16, f32)>();
643 if is_new_dim {
644 self.estimated_memory += size_of::<u32>() + size_of::<Vec<(DocId, u16, f32)>>() + 8; }
647 }
648
649 Ok(())
650 }
651
652 fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
654 use byteorder::{LittleEndian, WriteBytesExt};
655
656 let doc_bytes = super::store::serialize_document(doc, &self.schema)?;
657
658 self.store_file
659 .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
660 self.store_file.write_all(&doc_bytes)?;
661
662 Ok(())
663 }
664
665 pub async fn build<D: Directory + DirectoryWriter>(
671 mut self,
672 dir: &D,
673 segment_id: SegmentId,
674 trained: Option<&super::TrainedVectorStructures>,
675 ) -> Result<SegmentMeta> {
676 self.store_file.flush()?;
678
679 let files = SegmentFiles::new(segment_id.0);
680
681 let position_index = std::mem::take(&mut self.position_index);
683 let position_offsets = if !position_index.is_empty() {
684 let mut pos_writer = dir.streaming_writer(&files.positions).await?;
685 let offsets = Self::build_positions_streaming(
686 position_index,
687 &self.term_interner,
688 &mut *pos_writer,
689 )?;
690 pos_writer.finish()?;
691 offsets
692 } else {
693 FxHashMap::default()
694 };
695
696 let inverted_index = std::mem::take(&mut self.inverted_index);
699 let term_interner = std::mem::replace(&mut self.term_interner, Rodeo::new());
700 let store_path = self.store_path.clone();
701 let num_compression_threads = self.config.num_compression_threads;
702 let compression_level = self.config.compression_level;
703 let dense_vectors = std::mem::take(&mut self.dense_vectors);
704 let mut sparse_vectors = std::mem::take(&mut self.sparse_vectors);
705 let schema = &self.schema;
706
707 let mut term_dict_writer = dir.streaming_writer(&files.term_dict).await?;
709 let mut postings_writer = dir.streaming_writer(&files.postings).await?;
710 let mut store_writer = dir.streaming_writer(&files.store).await?;
711 let mut vectors_writer = if !dense_vectors.is_empty() {
712 Some(dir.streaming_writer(&files.vectors).await?)
713 } else {
714 None
715 };
716 let mut sparse_writer = if !sparse_vectors.is_empty() {
717 Some(dir.streaming_writer(&files.sparse).await?)
718 } else {
719 None
720 };
721
722 let ((postings_result, store_result), (vectors_result, sparse_result)) = rayon::join(
723 || {
724 rayon::join(
725 || {
726 Self::build_postings_streaming(
727 inverted_index,
728 term_interner,
729 &position_offsets,
730 &mut *term_dict_writer,
731 &mut *postings_writer,
732 )
733 },
734 || {
735 Self::build_store_streaming(
736 &store_path,
737 num_compression_threads,
738 compression_level,
739 &mut *store_writer,
740 )
741 },
742 )
743 },
744 || {
745 rayon::join(
746 || -> Result<()> {
747 if let Some(ref mut w) = vectors_writer {
748 Self::build_vectors_streaming(
749 dense_vectors,
750 schema,
751 trained,
752 &mut **w,
753 )?;
754 }
755 Ok(())
756 },
757 || -> Result<()> {
758 if let Some(ref mut w) = sparse_writer {
759 Self::build_sparse_streaming(&mut sparse_vectors, schema, &mut **w)?;
760 }
761 Ok(())
762 },
763 )
764 },
765 );
766 postings_result?;
767 store_result?;
768 vectors_result?;
769 sparse_result?;
770 term_dict_writer.finish()?;
771 postings_writer.finish()?;
772 store_writer.finish()?;
773 if let Some(w) = vectors_writer {
774 w.finish()?;
775 }
776 if let Some(w) = sparse_writer {
777 w.finish()?;
778 }
779 drop(position_offsets);
780 drop(sparse_vectors);
781
782 let meta = SegmentMeta {
783 id: segment_id.0,
784 num_docs: self.next_doc_id,
785 field_stats: self.field_stats.clone(),
786 };
787
788 dir.write(&files.meta, &meta.serialize()?).await?;
789
790 let _ = std::fs::remove_file(&self.store_path);
792
793 Ok(meta)
794 }
795
796 fn build_vectors_streaming(
801 dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
802 schema: &Schema,
803 trained: Option<&super::TrainedVectorStructures>,
804 writer: &mut dyn Write,
805 ) -> Result<()> {
806 use crate::dsl::{DenseVectorQuantization, VectorIndexType};
807
808 let mut fields: Vec<(u32, DenseVectorBuilder)> = dense_vectors
809 .into_iter()
810 .filter(|(_, b)| b.len() > 0)
811 .collect();
812 fields.sort_by_key(|(id, _)| *id);
813
814 if fields.is_empty() {
815 return Ok(());
816 }
817
818 let quants: Vec<DenseVectorQuantization> = fields
820 .iter()
821 .map(|(field_id, _)| {
822 schema
823 .get_field_entry(Field(*field_id))
824 .and_then(|e| e.dense_vector_config.as_ref())
825 .map(|c| c.quantization)
826 .unwrap_or(DenseVectorQuantization::F32)
827 })
828 .collect();
829
830 let mut field_sizes: Vec<usize> = Vec::with_capacity(fields.len());
832 for (i, (_field_id, builder)) in fields.iter().enumerate() {
833 field_sizes.push(FlatVectorData::serialized_binary_size(
834 builder.dim,
835 builder.len(),
836 quants[i],
837 ));
838 }
839
840 use crate::segment::format::{DenseVectorTocEntry, write_dense_toc_and_footer};
841
842 let mut toc: Vec<DenseVectorTocEntry> = Vec::with_capacity(fields.len() * 2);
845 let mut current_offset = 0u64;
846
847 let ann_blobs: Vec<(u32, u8, Vec<u8>)> = if let Some(trained) = trained {
850 fields
851 .par_iter()
852 .filter_map(|(field_id, builder)| {
853 let config = schema
854 .get_field_entry(Field(*field_id))
855 .and_then(|e| e.dense_vector_config.as_ref())?;
856
857 let dim = builder.dim;
858 let blob = match config.index_type {
859 VectorIndexType::IvfRaBitQ if trained.centroids.contains_key(field_id) => {
860 let centroids = &trained.centroids[field_id];
861 let (mut index, codebook) =
862 super::ann_build::new_ivf_rabitq(dim, centroids);
863 for (i, (doc_id, ordinal)) in builder.doc_ids.iter().enumerate() {
864 let v = &builder.vectors[i * dim..(i + 1) * dim];
865 index.add_vector(centroids, &codebook, *doc_id, *ordinal, v);
866 }
867 super::ann_build::serialize_ivf_rabitq(index, codebook)
868 .map(|b| (super::ann_build::IVF_RABITQ_TYPE, b))
869 }
870 VectorIndexType::ScaNN
871 if trained.centroids.contains_key(field_id)
872 && trained.codebooks.contains_key(field_id) =>
873 {
874 let centroids = &trained.centroids[field_id];
875 let codebook = &trained.codebooks[field_id];
876 let mut index = super::ann_build::new_scann(dim, centroids, codebook);
877 for (i, (doc_id, ordinal)) in builder.doc_ids.iter().enumerate() {
878 let v = &builder.vectors[i * dim..(i + 1) * dim];
879 index.add_vector(centroids, codebook, *doc_id, *ordinal, v);
880 }
881 super::ann_build::serialize_scann(index, codebook)
882 .map(|b| (super::ann_build::SCANN_TYPE, b))
883 }
884 _ => return None,
885 };
886 match blob {
887 Ok((index_type, bytes)) => {
888 log::info!(
889 "[segment_build] built ANN(type={}) for field {} ({} vectors, {} bytes)",
890 index_type,
891 field_id,
892 builder.doc_ids.len(),
893 bytes.len()
894 );
895 Some((*field_id, index_type, bytes))
896 }
897 Err(e) => {
898 log::warn!(
899 "[segment_build] ANN serialize failed for field {}: {}",
900 field_id,
901 e
902 );
903 None
904 }
905 }
906 })
907 .collect()
908 } else {
909 Vec::new()
910 };
911
912 for (i, (_field_id, builder)) in fields.into_iter().enumerate() {
914 let data_offset = current_offset;
915 FlatVectorData::serialize_binary_from_flat_streaming(
916 builder.dim,
917 &builder.vectors,
918 &builder.doc_ids,
919 quants[i],
920 writer,
921 )
922 .map_err(crate::Error::Io)?;
923 current_offset += field_sizes[i] as u64;
924 toc.push(DenseVectorTocEntry {
925 field_id: _field_id,
926 index_type: super::ann_build::FLAT_TYPE,
927 offset: data_offset,
928 size: field_sizes[i] as u64,
929 });
930 let pad = (8 - (current_offset % 8)) % 8;
932 if pad > 0 {
933 writer.write_all(&[0u8; 8][..pad as usize])?;
934 current_offset += pad;
935 }
936 }
938
939 for (field_id, index_type, blob) in ann_blobs {
941 let data_offset = current_offset;
942 let blob_len = blob.len() as u64;
943 writer.write_all(&blob)?;
944 current_offset += blob_len;
945 toc.push(DenseVectorTocEntry {
946 field_id,
947 index_type,
948 offset: data_offset,
949 size: blob_len,
950 });
951 let pad = (8 - (current_offset % 8)) % 8;
952 if pad > 0 {
953 writer.write_all(&[0u8; 8][..pad as usize])?;
954 current_offset += pad;
955 }
956 }
957
958 write_dense_toc_and_footer(writer, current_offset, &toc)?;
960
961 Ok(())
962 }
963
964 fn build_sparse_streaming(
969 sparse_vectors: &mut FxHashMap<u32, SparseVectorBuilder>,
970 schema: &Schema,
971 writer: &mut dyn Write,
972 ) -> Result<()> {
973 use crate::segment::format::{SparseFieldToc, write_sparse_toc_and_footer};
974 use crate::structures::{BlockSparsePostingList, WeightQuantization};
975
976 if sparse_vectors.is_empty() {
977 return Ok(());
978 }
979
980 let mut field_ids: Vec<u32> = sparse_vectors.keys().copied().collect();
982 field_ids.sort_unstable();
983
984 let mut field_tocs: Vec<SparseFieldToc> = Vec::new();
985 let mut current_offset = 0u64;
986
987 for &field_id in &field_ids {
988 let builder = sparse_vectors.get_mut(&field_id).unwrap();
989 if builder.is_empty() {
990 continue;
991 }
992
993 let field = crate::dsl::Field(field_id);
994 let sparse_config = schema
995 .get_field_entry(field)
996 .and_then(|e| e.sparse_vector_config.as_ref());
997
998 let quantization = sparse_config
999 .map(|c| c.weight_quantization)
1000 .unwrap_or(WeightQuantization::Float32);
1001
1002 let block_size = sparse_config.map(|c| c.block_size).unwrap_or(128);
1003 let pruning_fraction = sparse_config.and_then(|c| c.posting_list_pruning);
1004
1005 let mut dims: Vec<_> = std::mem::take(&mut builder.postings).into_iter().collect();
1009 dims.sort_unstable_by_key(|(id, _)| *id);
1010
1011 let serialized_dims: Vec<(u32, Vec<u8>)> = dims
1012 .into_par_iter()
1013 .map(|(dim_id, mut postings)| {
1014 postings.sort_unstable_by_key(|(doc_id, ordinal, _)| (*doc_id, *ordinal));
1015
1016 if let Some(fraction) = pruning_fraction
1017 && postings.len() > 1
1018 && fraction < 1.0
1019 {
1020 let original_len = postings.len();
1021 postings.sort_by(|a, b| {
1022 b.2.abs()
1023 .partial_cmp(&a.2.abs())
1024 .unwrap_or(std::cmp::Ordering::Equal)
1025 });
1026 let keep = ((original_len as f64 * fraction as f64).ceil() as usize).max(1);
1027 postings.truncate(keep);
1028 postings.sort_unstable_by_key(|(d, o, _)| (*d, *o));
1029 }
1030
1031 let block_list = BlockSparsePostingList::from_postings_with_block_size(
1032 &postings,
1033 quantization,
1034 block_size,
1035 )
1036 .map_err(crate::Error::Io)?;
1037
1038 let mut buf = Vec::new();
1039 block_list.serialize(&mut buf).map_err(crate::Error::Io)?;
1040 Ok((dim_id, buf))
1041 })
1042 .collect::<Result<Vec<_>>>()?;
1043
1044 let mut dim_entries: Vec<(u32, u64, u32)> = Vec::with_capacity(serialized_dims.len());
1046 for (dim_id, buf) in &serialized_dims {
1047 writer.write_all(buf)?;
1048 dim_entries.push((*dim_id, current_offset, buf.len() as u32));
1049 current_offset += buf.len() as u64;
1050 }
1051
1052 if !dim_entries.is_empty() {
1053 field_tocs.push(SparseFieldToc {
1054 field_id,
1055 quantization: quantization as u8,
1056 dims: dim_entries,
1057 });
1058 }
1059 }
1060
1061 if field_tocs.is_empty() {
1062 return Ok(());
1063 }
1064
1065 let toc_offset = current_offset;
1066 write_sparse_toc_and_footer(writer, toc_offset, &field_tocs).map_err(crate::Error::Io)?;
1067
1068 Ok(())
1069 }
1070
1071 fn build_positions_streaming(
1076 position_index: HashMap<TermKey, PositionPostingListBuilder>,
1077 term_interner: &Rodeo,
1078 writer: &mut dyn Write,
1079 ) -> Result<FxHashMap<Vec<u8>, (u64, u32)>> {
1080 use crate::structures::PositionPostingList;
1081
1082 let mut position_offsets: FxHashMap<Vec<u8>, (u64, u32)> = FxHashMap::default();
1083
1084 let mut entries: Vec<(Vec<u8>, PositionPostingListBuilder)> = position_index
1086 .into_iter()
1087 .map(|(term_key, pos_builder)| {
1088 let term_str = term_interner.resolve(&term_key.term);
1089 let mut key = Vec::with_capacity(size_of::<u32>() + term_str.len());
1090 key.extend_from_slice(&term_key.field.to_le_bytes());
1091 key.extend_from_slice(term_str.as_bytes());
1092 (key, pos_builder)
1093 })
1094 .collect();
1095
1096 entries.sort_by(|a, b| a.0.cmp(&b.0));
1097
1098 let mut current_offset = 0u64;
1099 let mut buf = Vec::new();
1100
1101 for (key, pos_builder) in entries {
1102 let mut pos_list = PositionPostingList::with_capacity(pos_builder.postings.len());
1103 for (doc_id, positions) in pos_builder.postings {
1104 pos_list.push(doc_id, positions);
1105 }
1106
1107 buf.clear();
1109 pos_list.serialize(&mut buf).map_err(crate::Error::Io)?;
1110 writer.write_all(&buf)?;
1111
1112 position_offsets.insert(key, (current_offset, buf.len() as u32));
1113 current_offset += buf.len() as u64;
1114 }
1115
1116 Ok(position_offsets)
1117 }
1118
1119 fn build_postings_streaming(
1124 inverted_index: HashMap<TermKey, PostingListBuilder>,
1125 term_interner: Rodeo,
1126 position_offsets: &FxHashMap<Vec<u8>, (u64, u32)>,
1127 term_dict_writer: &mut dyn Write,
1128 postings_writer: &mut dyn Write,
1129 ) -> Result<()> {
1130 let mut term_entries: Vec<(Vec<u8>, PostingListBuilder)> = inverted_index
1132 .into_iter()
1133 .map(|(term_key, posting_list)| {
1134 let term_str = term_interner.resolve(&term_key.term);
1135 let mut key = Vec::with_capacity(4 + term_str.len());
1136 key.extend_from_slice(&term_key.field.to_le_bytes());
1137 key.extend_from_slice(term_str.as_bytes());
1138 (key, posting_list)
1139 })
1140 .collect();
1141
1142 drop(term_interner);
1143
1144 term_entries.par_sort_unstable_by(|a, b| a.0.cmp(&b.0));
1145
1146 let serialized: Vec<(Vec<u8>, SerializedPosting)> = term_entries
1150 .into_par_iter()
1151 .map(|(key, posting_builder)| {
1152 let has_positions = position_offsets.contains_key(&key);
1153
1154 if !has_positions {
1156 let doc_ids: Vec<u32> =
1157 posting_builder.postings.iter().map(|p| p.doc_id).collect();
1158 let term_freqs: Vec<u32> = posting_builder
1159 .postings
1160 .iter()
1161 .map(|p| p.term_freq as u32)
1162 .collect();
1163 if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
1164 return Ok((key, SerializedPosting::Inline(inline)));
1165 }
1166 }
1167
1168 let mut full_postings = PostingList::with_capacity(posting_builder.len());
1170 for p in &posting_builder.postings {
1171 full_postings.push(p.doc_id, p.term_freq as u32);
1172 }
1173
1174 let mut posting_bytes = Vec::new();
1175 let block_list =
1176 crate::structures::BlockPostingList::from_posting_list(&full_postings)?;
1177 block_list.serialize(&mut posting_bytes)?;
1178 let result = SerializedPosting::External {
1179 bytes: posting_bytes,
1180 doc_count: full_postings.doc_count(),
1181 };
1182
1183 Ok((key, result))
1184 })
1185 .collect::<Result<Vec<_>>>()?;
1186
1187 let mut postings_offset = 0u64;
1189 let mut writer = SSTableWriter::<_, TermInfo>::new(term_dict_writer);
1190
1191 for (key, serialized_posting) in serialized {
1192 let term_info = match serialized_posting {
1193 SerializedPosting::Inline(info) => info,
1194 SerializedPosting::External { bytes, doc_count } => {
1195 let posting_len = bytes.len() as u32;
1196 postings_writer.write_all(&bytes)?;
1197
1198 let info = if let Some(&(pos_offset, pos_len)) = position_offsets.get(&key) {
1199 TermInfo::external_with_positions(
1200 postings_offset,
1201 posting_len,
1202 doc_count,
1203 pos_offset,
1204 pos_len,
1205 )
1206 } else {
1207 TermInfo::external(postings_offset, posting_len, doc_count)
1208 };
1209 postings_offset += posting_len as u64;
1210 info
1211 }
1212 };
1213
1214 writer.insert(&key, &term_info)?;
1215 }
1216
1217 let _ = writer.finish()?;
1218 Ok(())
1219 }
1220
1221 fn build_store_streaming(
1227 store_path: &PathBuf,
1228 num_compression_threads: usize,
1229 compression_level: CompressionLevel,
1230 writer: &mut dyn Write,
1231 ) -> Result<()> {
1232 use super::store::EagerParallelStoreWriter;
1233
1234 let file = File::open(store_path)?;
1235 let mmap = unsafe { memmap2::Mmap::map(&file)? };
1236
1237 let mut store_writer = EagerParallelStoreWriter::with_compression_level(
1238 writer,
1239 num_compression_threads,
1240 compression_level,
1241 );
1242
1243 let mut offset = 0usize;
1246 while offset + 4 <= mmap.len() {
1247 let doc_len = u32::from_le_bytes([
1248 mmap[offset],
1249 mmap[offset + 1],
1250 mmap[offset + 2],
1251 mmap[offset + 3],
1252 ]) as usize;
1253 offset += 4;
1254
1255 if offset + doc_len > mmap.len() {
1256 break;
1257 }
1258
1259 let doc_bytes = &mmap[offset..offset + doc_len];
1260 store_writer.store_raw(doc_bytes)?;
1261 offset += doc_len;
1262 }
1263
1264 store_writer.finish()?;
1265 Ok(())
1266 }
1267}
1268
1269impl Drop for SegmentBuilder {
1270 fn drop(&mut self) {
1271 let _ = std::fs::remove_file(&self.store_path);
1273 }
1274}