1mod config;
12mod posting;
13mod vectors;
14
15pub use config::{MemoryBreakdown, SegmentBuilderConfig, SegmentBuilderStats};
16
17use std::fs::{File, OpenOptions};
18use std::io::{BufWriter, Write};
19use std::mem::size_of;
20use std::path::PathBuf;
21
22use hashbrown::HashMap;
23use lasso::{Rodeo, Spur};
24use rayon::prelude::*;
25use rustc_hash::FxHashMap;
26
27use crate::compression::CompressionLevel;
28
29use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
30use crate::directories::{Directory, DirectoryWriter};
31use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
32use crate::structures::{PostingList, SSTableWriter, TermInfo};
33use crate::tokenizer::BoxedTokenizer;
34use crate::{DocId, Result};
35
36use posting::{
37 CompactPosting, PositionPostingListBuilder, PostingListBuilder, SerializedPosting, TermKey,
38};
39use vectors::{DenseVectorBuilder, SparseVectorBuilder};
40
41use super::vector_data::FlatVectorData;
42
43const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; pub struct SegmentBuilder {
53 schema: Schema,
54 config: SegmentBuilderConfig,
55 tokenizers: FxHashMap<Field, BoxedTokenizer>,
56
57 term_interner: Rodeo,
59
60 inverted_index: HashMap<TermKey, PostingListBuilder>,
62
63 store_file: BufWriter<File>,
65 store_path: PathBuf,
66
67 next_doc_id: DocId,
69
70 field_stats: FxHashMap<u32, FieldStats>,
72
73 doc_field_lengths: Vec<u32>,
77 num_indexed_fields: usize,
78 field_to_slot: FxHashMap<u32, usize>,
79
80 local_tf_buffer: FxHashMap<Spur, u32>,
83
84 local_positions: FxHashMap<Spur, Vec<u32>>,
87
88 token_buffer: String,
90
91 dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
94
95 sparse_vectors: FxHashMap<u32, SparseVectorBuilder>,
98
99 position_index: HashMap<TermKey, PositionPostingListBuilder>,
102
103 position_enabled_fields: FxHashMap<u32, Option<crate::dsl::PositionMode>>,
105
106 current_element_ordinal: FxHashMap<u32, u32>,
108
109 estimated_memory: usize,
111}
112
113impl SegmentBuilder {
114 pub fn new(schema: Schema, config: SegmentBuilderConfig) -> Result<Self> {
116 let segment_id = uuid::Uuid::new_v4();
117 let store_path = config
118 .temp_dir
119 .join(format!("hermes_store_{}.tmp", segment_id));
120
121 let store_file = BufWriter::with_capacity(
122 STORE_BUFFER_SIZE,
123 OpenOptions::new()
124 .create(true)
125 .write(true)
126 .truncate(true)
127 .open(&store_path)?,
128 );
129
130 let mut num_indexed_fields = 0;
133 let mut field_to_slot = FxHashMap::default();
134 let mut position_enabled_fields = FxHashMap::default();
135 for (field, entry) in schema.fields() {
136 if entry.indexed && matches!(entry.field_type, FieldType::Text) {
137 field_to_slot.insert(field.0, num_indexed_fields);
138 num_indexed_fields += 1;
139 if entry.positions.is_some() {
140 position_enabled_fields.insert(field.0, entry.positions);
141 }
142 }
143 }
144
145 Ok(Self {
146 schema,
147 tokenizers: FxHashMap::default(),
148 term_interner: Rodeo::new(),
149 inverted_index: HashMap::with_capacity(config.posting_map_capacity),
150 store_file,
151 store_path,
152 next_doc_id: 0,
153 field_stats: FxHashMap::default(),
154 doc_field_lengths: Vec::new(),
155 num_indexed_fields,
156 field_to_slot,
157 local_tf_buffer: FxHashMap::default(),
158 local_positions: FxHashMap::default(),
159 token_buffer: String::with_capacity(64),
160 config,
161 dense_vectors: FxHashMap::default(),
162 sparse_vectors: FxHashMap::default(),
163 position_index: HashMap::new(),
164 position_enabled_fields,
165 current_element_ordinal: FxHashMap::default(),
166 estimated_memory: 0,
167 })
168 }
169
170 pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
171 self.tokenizers.insert(field, tokenizer);
172 }
173
174 pub fn num_docs(&self) -> u32 {
175 self.next_doc_id
176 }
177
178 #[inline]
180 pub fn estimated_memory_bytes(&self) -> usize {
181 self.estimated_memory
182 }
183
184 pub fn recalibrate_memory(&mut self) {
189 self.estimated_memory = self.stats().estimated_memory_bytes;
190 }
191
192 pub fn sparse_dim_count(&self) -> usize {
194 self.sparse_vectors.values().map(|b| b.postings.len()).sum()
195 }
196
197 pub fn stats(&self) -> SegmentBuilderStats {
199 use std::mem::size_of;
200
201 let postings_in_memory: usize =
202 self.inverted_index.values().map(|p| p.postings.len()).sum();
203
204 let compact_posting_size = size_of::<CompactPosting>();
206 let vec_overhead = size_of::<Vec<u8>>(); let term_key_size = size_of::<TermKey>();
208 let posting_builder_size = size_of::<PostingListBuilder>();
209 let spur_size = size_of::<lasso::Spur>();
210 let sparse_entry_size = size_of::<(DocId, u16, f32)>();
211
212 let hashmap_entry_base_overhead = 8usize;
215
216 let fxhashmap_entry_overhead = hashmap_entry_base_overhead;
218
219 let postings_bytes: usize = self
221 .inverted_index
222 .values()
223 .map(|p| p.postings.capacity() * compact_posting_size + vec_overhead)
224 .sum();
225
226 let index_overhead_bytes = self.inverted_index.len()
228 * (term_key_size + posting_builder_size + hashmap_entry_base_overhead);
229
230 let interner_arena_overhead = 2 * size_of::<usize>();
233 let avg_term_len = 8; let interner_bytes =
235 self.term_interner.len() * (avg_term_len + spur_size + interner_arena_overhead);
236
237 let field_lengths_bytes =
239 self.doc_field_lengths.capacity() * size_of::<u32>() + vec_overhead;
240
241 let mut dense_vectors_bytes: usize = 0;
243 let mut dense_vector_count: usize = 0;
244 let doc_id_ordinal_size = size_of::<(DocId, u16)>();
245 for b in self.dense_vectors.values() {
246 dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
247 + b.doc_ids.capacity() * doc_id_ordinal_size
248 + 2 * vec_overhead; dense_vector_count += b.doc_ids.len();
250 }
251
252 let local_tf_entry_size = spur_size + size_of::<u32>() + fxhashmap_entry_overhead;
254 let local_tf_buffer_bytes = self.local_tf_buffer.capacity() * local_tf_entry_size;
255
256 let mut sparse_vectors_bytes: usize = 0;
258 for builder in self.sparse_vectors.values() {
259 for postings in builder.postings.values() {
260 sparse_vectors_bytes += postings.capacity() * sparse_entry_size + vec_overhead;
261 }
262 let inner_entry_size = size_of::<u32>() + vec_overhead + fxhashmap_entry_overhead;
264 sparse_vectors_bytes += builder.postings.len() * inner_entry_size;
265 }
266 let outer_sparse_entry_size =
268 size_of::<u32>() + size_of::<SparseVectorBuilder>() + fxhashmap_entry_overhead;
269 sparse_vectors_bytes += self.sparse_vectors.len() * outer_sparse_entry_size;
270
271 let mut position_index_bytes: usize = 0;
273 for pos_builder in self.position_index.values() {
274 for (_, positions) in &pos_builder.postings {
275 position_index_bytes += positions.capacity() * size_of::<u32>() + vec_overhead;
276 }
277 let pos_entry_size = size_of::<DocId>() + vec_overhead;
279 position_index_bytes += pos_builder.postings.capacity() * pos_entry_size;
280 }
281 let pos_index_entry_size =
283 term_key_size + size_of::<PositionPostingListBuilder>() + hashmap_entry_base_overhead;
284 position_index_bytes += self.position_index.len() * pos_index_entry_size;
285
286 let estimated_memory_bytes = postings_bytes
287 + index_overhead_bytes
288 + interner_bytes
289 + field_lengths_bytes
290 + dense_vectors_bytes
291 + local_tf_buffer_bytes
292 + sparse_vectors_bytes
293 + position_index_bytes;
294
295 let memory_breakdown = MemoryBreakdown {
296 postings_bytes,
297 index_overhead_bytes,
298 interner_bytes,
299 field_lengths_bytes,
300 dense_vectors_bytes,
301 dense_vector_count,
302 sparse_vectors_bytes,
303 position_index_bytes,
304 };
305
306 SegmentBuilderStats {
307 num_docs: self.next_doc_id,
308 unique_terms: self.inverted_index.len(),
309 postings_in_memory,
310 interned_strings: self.term_interner.len(),
311 doc_field_lengths_size: self.doc_field_lengths.len(),
312 estimated_memory_bytes,
313 memory_breakdown,
314 }
315 }
316
317 pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
319 let doc_id = self.next_doc_id;
320 self.next_doc_id += 1;
321
322 let base_idx = self.doc_field_lengths.len();
324 self.doc_field_lengths
325 .resize(base_idx + self.num_indexed_fields, 0);
326 self.estimated_memory += self.num_indexed_fields * std::mem::size_of::<u32>();
327
328 self.current_element_ordinal.clear();
330
331 for (field, value) in doc.field_values() {
332 let entry = self.schema.get_field_entry(*field);
333 if entry.is_none() {
334 continue;
335 }
336
337 let entry = entry.unwrap();
338 let dominated_by_index = matches!(&entry.field_type, FieldType::DenseVector);
341 if !dominated_by_index && !entry.indexed {
342 continue;
343 }
344
345 match (&entry.field_type, value) {
346 (FieldType::Text, FieldValue::Text(text)) => {
347 let element_ordinal = *self.current_element_ordinal.get(&field.0).unwrap_or(&0);
349 let token_count =
350 self.index_text_field(*field, doc_id, text, element_ordinal)?;
351 *self.current_element_ordinal.entry(field.0).or_insert(0) += 1;
353
354 let stats = self.field_stats.entry(field.0).or_default();
356 stats.total_tokens += token_count as u64;
357 if element_ordinal == 0 {
359 stats.doc_count += 1;
360 }
361
362 if let Some(&slot) = self.field_to_slot.get(&field.0) {
364 self.doc_field_lengths[base_idx + slot] = token_count;
365 }
366 }
367 (FieldType::U64, FieldValue::U64(v)) => {
368 self.index_numeric_field(*field, doc_id, *v)?;
369 }
370 (FieldType::I64, FieldValue::I64(v)) => {
371 self.index_numeric_field(*field, doc_id, *v as u64)?;
372 }
373 (FieldType::F64, FieldValue::F64(v)) => {
374 self.index_numeric_field(*field, doc_id, v.to_bits())?;
375 }
376 (FieldType::DenseVector, FieldValue::DenseVector(vec))
377 if entry.indexed || entry.stored =>
378 {
379 let element_ordinal = *self.current_element_ordinal.get(&field.0).unwrap_or(&0);
381 self.index_dense_vector_field(*field, doc_id, element_ordinal as u16, vec)?;
382 *self.current_element_ordinal.entry(field.0).or_insert(0) += 1;
384 }
385 (FieldType::SparseVector, FieldValue::SparseVector(entries)) => {
386 let element_ordinal = *self.current_element_ordinal.get(&field.0).unwrap_or(&0);
388 self.index_sparse_vector_field(
389 *field,
390 doc_id,
391 element_ordinal as u16,
392 entries,
393 )?;
394 *self.current_element_ordinal.entry(field.0).or_insert(0) += 1;
396 }
397 _ => {}
398 }
399 }
400
401 self.write_document_to_store(&doc)?;
403
404 Ok(doc_id)
405 }
406
407 fn index_text_field(
418 &mut self,
419 field: Field,
420 doc_id: DocId,
421 text: &str,
422 element_ordinal: u32,
423 ) -> Result<u32> {
424 use crate::dsl::PositionMode;
425
426 let field_id = field.0;
427 let position_mode = self
428 .position_enabled_fields
429 .get(&field_id)
430 .copied()
431 .flatten();
432
433 self.local_tf_buffer.clear();
437 for v in self.local_positions.values_mut() {
439 v.clear();
440 }
441
442 let mut token_position = 0u32;
443
444 for word in text.split_whitespace() {
446 self.token_buffer.clear();
448 for c in word.chars() {
449 if c.is_alphanumeric() {
450 for lc in c.to_lowercase() {
451 self.token_buffer.push(lc);
452 }
453 }
454 }
455
456 if self.token_buffer.is_empty() {
457 continue;
458 }
459
460 let is_new_string = !self.term_interner.contains(&self.token_buffer);
462 let term_spur = self.term_interner.get_or_intern(&self.token_buffer);
463 if is_new_string {
464 use std::mem::size_of;
465 self.estimated_memory +=
467 self.token_buffer.len() + size_of::<Spur>() + 2 * size_of::<usize>();
468 }
469 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
470
471 if let Some(mode) = position_mode {
473 let encoded_pos = match mode {
474 PositionMode::Ordinal => element_ordinal << 20,
476 PositionMode::TokenPosition => token_position,
478 PositionMode::Full => (element_ordinal << 20) | token_position,
480 };
481 self.local_positions
482 .entry(term_spur)
483 .or_default()
484 .push(encoded_pos);
485 }
486
487 token_position += 1;
488 }
489
490 for (&term_spur, &tf) in &self.local_tf_buffer {
493 let term_key = TermKey {
494 field: field_id,
495 term: term_spur,
496 };
497
498 let is_new_term = !self.inverted_index.contains_key(&term_key);
499 let posting = self
500 .inverted_index
501 .entry(term_key)
502 .or_insert_with(PostingListBuilder::new);
503 posting.add(doc_id, tf);
504
505 use std::mem::size_of;
507 self.estimated_memory += size_of::<CompactPosting>();
508 if is_new_term {
509 self.estimated_memory +=
511 size_of::<TermKey>() + size_of::<PostingListBuilder>() + 24;
512 }
513
514 if position_mode.is_some()
516 && let Some(positions) = self.local_positions.get(&term_spur)
517 {
518 let is_new_pos_term = !self.position_index.contains_key(&term_key);
519 let pos_posting = self
520 .position_index
521 .entry(term_key)
522 .or_insert_with(PositionPostingListBuilder::new);
523 for &pos in positions {
524 pos_posting.add_position(doc_id, pos);
525 }
526 self.estimated_memory += positions.len() * size_of::<u32>();
528 if is_new_pos_term {
529 self.estimated_memory +=
530 size_of::<TermKey>() + size_of::<PositionPostingListBuilder>() + 24;
531 }
532 }
533 }
534
535 Ok(token_position)
536 }
537
538 fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
539 use std::mem::size_of;
540
541 let term_str = format!("__num_{}", value);
543 let is_new_string = !self.term_interner.contains(&term_str);
544 let term_spur = self.term_interner.get_or_intern(&term_str);
545
546 let term_key = TermKey {
547 field: field.0,
548 term: term_spur,
549 };
550
551 let is_new_term = !self.inverted_index.contains_key(&term_key);
552 let posting = self
553 .inverted_index
554 .entry(term_key)
555 .or_insert_with(PostingListBuilder::new);
556 posting.add(doc_id, 1);
557
558 self.estimated_memory += size_of::<CompactPosting>();
560 if is_new_term {
561 self.estimated_memory += size_of::<TermKey>() + size_of::<PostingListBuilder>() + 24;
562 }
563 if is_new_string {
564 self.estimated_memory += term_str.len() + size_of::<Spur>() + 2 * size_of::<usize>();
565 }
566
567 Ok(())
568 }
569
570 fn index_dense_vector_field(
572 &mut self,
573 field: Field,
574 doc_id: DocId,
575 ordinal: u16,
576 vector: &[f32],
577 ) -> Result<()> {
578 let dim = vector.len();
579
580 let builder = self
581 .dense_vectors
582 .entry(field.0)
583 .or_insert_with(|| DenseVectorBuilder::new(dim));
584
585 if builder.dim != dim && builder.len() > 0 {
587 return Err(crate::Error::Schema(format!(
588 "Dense vector dimension mismatch: expected {}, got {}",
589 builder.dim, dim
590 )));
591 }
592
593 builder.add(doc_id, ordinal, vector);
594
595 use std::mem::{size_of, size_of_val};
597 self.estimated_memory += size_of_val(vector) + size_of::<(DocId, u16)>();
598
599 Ok(())
600 }
601
602 fn index_sparse_vector_field(
609 &mut self,
610 field: Field,
611 doc_id: DocId,
612 ordinal: u16,
613 entries: &[(u32, f32)],
614 ) -> Result<()> {
615 let weight_threshold = self
617 .schema
618 .get_field_entry(field)
619 .and_then(|entry| entry.sparse_vector_config.as_ref())
620 .map(|config| config.weight_threshold)
621 .unwrap_or(0.0);
622
623 let builder = self
624 .sparse_vectors
625 .entry(field.0)
626 .or_insert_with(SparseVectorBuilder::new);
627
628 for &(dim_id, weight) in entries {
629 if weight.abs() < weight_threshold {
631 continue;
632 }
633
634 use std::mem::size_of;
636 let is_new_dim = !builder.postings.contains_key(&dim_id);
637 builder.add(dim_id, doc_id, ordinal, weight);
638 self.estimated_memory += size_of::<(DocId, u16, f32)>();
639 if is_new_dim {
640 self.estimated_memory += size_of::<u32>() + size_of::<Vec<(DocId, u16, f32)>>() + 8; }
643 }
644
645 Ok(())
646 }
647
648 fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
650 use byteorder::{LittleEndian, WriteBytesExt};
651
652 let doc_bytes = super::store::serialize_document(doc, &self.schema)?;
653
654 self.store_file
655 .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
656 self.store_file.write_all(&doc_bytes)?;
657
658 Ok(())
659 }
660
661 pub async fn build<D: Directory + DirectoryWriter>(
667 mut self,
668 dir: &D,
669 segment_id: SegmentId,
670 trained: Option<&super::TrainedVectorStructures>,
671 ) -> Result<SegmentMeta> {
672 self.store_file.flush()?;
674
675 let files = SegmentFiles::new(segment_id.0);
676
677 let position_index = std::mem::take(&mut self.position_index);
679 let position_offsets = if !position_index.is_empty() {
680 let mut pos_writer = dir.streaming_writer(&files.positions).await?;
681 let offsets = Self::build_positions_streaming(
682 position_index,
683 &self.term_interner,
684 &mut *pos_writer,
685 )?;
686 pos_writer.finish()?;
687 offsets
688 } else {
689 FxHashMap::default()
690 };
691
692 let inverted_index = std::mem::take(&mut self.inverted_index);
695 let term_interner = std::mem::replace(&mut self.term_interner, Rodeo::new());
696 let store_path = self.store_path.clone();
697 let num_compression_threads = self.config.num_compression_threads;
698 let compression_level = self.config.compression_level;
699 let dense_vectors = std::mem::take(&mut self.dense_vectors);
700 let mut sparse_vectors = std::mem::take(&mut self.sparse_vectors);
701 let schema = &self.schema;
702
703 let mut term_dict_writer = dir.streaming_writer(&files.term_dict).await?;
705 let mut postings_writer = dir.streaming_writer(&files.postings).await?;
706 let mut store_writer = dir.streaming_writer(&files.store).await?;
707 let mut vectors_writer = if !dense_vectors.is_empty() {
708 Some(dir.streaming_writer(&files.vectors).await?)
709 } else {
710 None
711 };
712 let mut sparse_writer = if !sparse_vectors.is_empty() {
713 Some(dir.streaming_writer(&files.sparse).await?)
714 } else {
715 None
716 };
717
718 let ((postings_result, store_result), (vectors_result, sparse_result)) = rayon::join(
719 || {
720 rayon::join(
721 || {
722 Self::build_postings_streaming(
723 inverted_index,
724 term_interner,
725 &position_offsets,
726 &mut *term_dict_writer,
727 &mut *postings_writer,
728 )
729 },
730 || {
731 Self::build_store_streaming(
732 &store_path,
733 num_compression_threads,
734 compression_level,
735 &mut *store_writer,
736 )
737 },
738 )
739 },
740 || {
741 rayon::join(
742 || -> Result<()> {
743 if let Some(ref mut w) = vectors_writer {
744 Self::build_vectors_streaming(
745 dense_vectors,
746 schema,
747 trained,
748 &mut **w,
749 )?;
750 }
751 Ok(())
752 },
753 || -> Result<()> {
754 if let Some(ref mut w) = sparse_writer {
755 Self::build_sparse_streaming(&mut sparse_vectors, schema, &mut **w)?;
756 }
757 Ok(())
758 },
759 )
760 },
761 );
762 postings_result?;
763 store_result?;
764 vectors_result?;
765 sparse_result?;
766 term_dict_writer.finish()?;
767 postings_writer.finish()?;
768 store_writer.finish()?;
769 if let Some(w) = vectors_writer {
770 w.finish()?;
771 }
772 if let Some(w) = sparse_writer {
773 w.finish()?;
774 }
775 drop(position_offsets);
776 drop(sparse_vectors);
777
778 let meta = SegmentMeta {
779 id: segment_id.0,
780 num_docs: self.next_doc_id,
781 field_stats: self.field_stats.clone(),
782 };
783
784 dir.write(&files.meta, &meta.serialize()?).await?;
785
786 let _ = std::fs::remove_file(&self.store_path);
788
789 Ok(meta)
790 }
791
792 fn build_vectors_streaming(
797 dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
798 schema: &Schema,
799 trained: Option<&super::TrainedVectorStructures>,
800 writer: &mut dyn Write,
801 ) -> Result<()> {
802 use crate::dsl::{DenseVectorQuantization, VectorIndexType};
803
804 let mut fields: Vec<(u32, DenseVectorBuilder)> = dense_vectors
805 .into_iter()
806 .filter(|(_, b)| b.len() > 0)
807 .collect();
808 fields.sort_by_key(|(id, _)| *id);
809
810 if fields.is_empty() {
811 return Ok(());
812 }
813
814 let quants: Vec<DenseVectorQuantization> = fields
816 .iter()
817 .map(|(field_id, _)| {
818 schema
819 .get_field_entry(Field(*field_id))
820 .and_then(|e| e.dense_vector_config.as_ref())
821 .map(|c| c.quantization)
822 .unwrap_or(DenseVectorQuantization::F32)
823 })
824 .collect();
825
826 let mut field_sizes: Vec<usize> = Vec::with_capacity(fields.len());
828 for (i, (_field_id, builder)) in fields.iter().enumerate() {
829 field_sizes.push(FlatVectorData::serialized_binary_size(
830 builder.dim,
831 builder.len(),
832 quants[i],
833 ));
834 }
835
836 use crate::segment::format::{DenseVectorTocEntry, write_dense_toc_and_footer};
837
838 let mut toc: Vec<DenseVectorTocEntry> = Vec::with_capacity(fields.len() * 2);
841 let mut current_offset = 0u64;
842
843 let ann_blobs: Vec<(u32, u8, Vec<u8>)> = if let Some(trained) = trained {
846 fields
847 .par_iter()
848 .filter_map(|(field_id, builder)| {
849 let config = schema
850 .get_field_entry(Field(*field_id))
851 .and_then(|e| e.dense_vector_config.as_ref())?;
852
853 let dim = builder.dim;
854 let blob = match config.index_type {
855 VectorIndexType::IvfRaBitQ if trained.centroids.contains_key(field_id) => {
856 let centroids = &trained.centroids[field_id];
857 let (mut index, codebook) =
858 super::ann_build::new_ivf_rabitq(dim, centroids);
859 for (i, (doc_id, ordinal)) in builder.doc_ids.iter().enumerate() {
860 let v = &builder.vectors[i * dim..(i + 1) * dim];
861 index.add_vector(centroids, &codebook, *doc_id, *ordinal, v);
862 }
863 super::ann_build::serialize_ivf_rabitq(index, codebook)
864 .map(|b| (super::ann_build::IVF_RABITQ_TYPE, b))
865 }
866 VectorIndexType::ScaNN
867 if trained.centroids.contains_key(field_id)
868 && trained.codebooks.contains_key(field_id) =>
869 {
870 let centroids = &trained.centroids[field_id];
871 let codebook = &trained.codebooks[field_id];
872 let mut index = super::ann_build::new_scann(dim, centroids, codebook);
873 for (i, (doc_id, ordinal)) in builder.doc_ids.iter().enumerate() {
874 let v = &builder.vectors[i * dim..(i + 1) * dim];
875 index.add_vector(centroids, codebook, *doc_id, *ordinal, v);
876 }
877 super::ann_build::serialize_scann(index, codebook)
878 .map(|b| (super::ann_build::SCANN_TYPE, b))
879 }
880 _ => return None,
881 };
882 match blob {
883 Ok((index_type, bytes)) => {
884 log::info!(
885 "[segment_build] built ANN(type={}) for field {} ({} vectors, {} bytes)",
886 index_type,
887 field_id,
888 builder.doc_ids.len(),
889 bytes.len()
890 );
891 Some((*field_id, index_type, bytes))
892 }
893 Err(e) => {
894 log::warn!(
895 "[segment_build] ANN serialize failed for field {}: {}",
896 field_id,
897 e
898 );
899 None
900 }
901 }
902 })
903 .collect()
904 } else {
905 Vec::new()
906 };
907
908 for (i, (_field_id, builder)) in fields.into_iter().enumerate() {
910 let data_offset = current_offset;
911 FlatVectorData::serialize_binary_from_flat_streaming(
912 builder.dim,
913 &builder.vectors,
914 &builder.doc_ids,
915 quants[i],
916 writer,
917 )
918 .map_err(crate::Error::Io)?;
919 current_offset += field_sizes[i] as u64;
920 toc.push(DenseVectorTocEntry {
921 field_id: _field_id,
922 index_type: super::ann_build::FLAT_TYPE,
923 offset: data_offset,
924 size: field_sizes[i] as u64,
925 });
926 let pad = (8 - (current_offset % 8)) % 8;
928 if pad > 0 {
929 writer.write_all(&[0u8; 8][..pad as usize])?;
930 current_offset += pad;
931 }
932 }
934
935 for (field_id, index_type, blob) in ann_blobs {
937 let data_offset = current_offset;
938 let blob_len = blob.len() as u64;
939 writer.write_all(&blob)?;
940 current_offset += blob_len;
941 toc.push(DenseVectorTocEntry {
942 field_id,
943 index_type,
944 offset: data_offset,
945 size: blob_len,
946 });
947 let pad = (8 - (current_offset % 8)) % 8;
948 if pad > 0 {
949 writer.write_all(&[0u8; 8][..pad as usize])?;
950 current_offset += pad;
951 }
952 }
953
954 write_dense_toc_and_footer(writer, current_offset, &toc)?;
956
957 Ok(())
958 }
959
960 fn build_sparse_streaming(
965 sparse_vectors: &mut FxHashMap<u32, SparseVectorBuilder>,
966 schema: &Schema,
967 writer: &mut dyn Write,
968 ) -> Result<()> {
969 use crate::segment::format::{SparseFieldToc, write_sparse_toc_and_footer};
970 use crate::structures::{BlockSparsePostingList, WeightQuantization};
971
972 if sparse_vectors.is_empty() {
973 return Ok(());
974 }
975
976 let mut field_ids: Vec<u32> = sparse_vectors.keys().copied().collect();
978 field_ids.sort_unstable();
979
980 let mut field_tocs: Vec<SparseFieldToc> = Vec::new();
981 let mut current_offset = 0u64;
982
983 for &field_id in &field_ids {
984 let builder = sparse_vectors.get_mut(&field_id).unwrap();
985 if builder.is_empty() {
986 continue;
987 }
988
989 let field = crate::dsl::Field(field_id);
990 let sparse_config = schema
991 .get_field_entry(field)
992 .and_then(|e| e.sparse_vector_config.as_ref());
993
994 let quantization = sparse_config
995 .map(|c| c.weight_quantization)
996 .unwrap_or(WeightQuantization::Float32);
997
998 let block_size = sparse_config.map(|c| c.block_size).unwrap_or(128);
999 let pruning_fraction = sparse_config.and_then(|c| c.posting_list_pruning);
1000
1001 let mut dims: Vec<_> = std::mem::take(&mut builder.postings).into_iter().collect();
1005 dims.sort_unstable_by_key(|(id, _)| *id);
1006
1007 let serialized_dims: Vec<(u32, Vec<u8>)> = dims
1008 .into_par_iter()
1009 .map(|(dim_id, mut postings)| {
1010 postings.sort_unstable_by_key(|(doc_id, ordinal, _)| (*doc_id, *ordinal));
1011
1012 if let Some(fraction) = pruning_fraction
1013 && postings.len() > 1
1014 && fraction < 1.0
1015 {
1016 let original_len = postings.len();
1017 postings.sort_by(|a, b| {
1018 b.2.abs()
1019 .partial_cmp(&a.2.abs())
1020 .unwrap_or(std::cmp::Ordering::Equal)
1021 });
1022 let keep = ((original_len as f64 * fraction as f64).ceil() as usize).max(1);
1023 postings.truncate(keep);
1024 postings.sort_unstable_by_key(|(d, o, _)| (*d, *o));
1025 }
1026
1027 let block_list = BlockSparsePostingList::from_postings_with_block_size(
1028 &postings,
1029 quantization,
1030 block_size,
1031 )
1032 .map_err(crate::Error::Io)?;
1033
1034 let mut buf = Vec::new();
1035 block_list.serialize(&mut buf).map_err(crate::Error::Io)?;
1036 Ok((dim_id, buf))
1037 })
1038 .collect::<Result<Vec<_>>>()?;
1039
1040 let mut dim_entries: Vec<(u32, u64, u32)> = Vec::with_capacity(serialized_dims.len());
1042 for (dim_id, buf) in &serialized_dims {
1043 writer.write_all(buf)?;
1044 dim_entries.push((*dim_id, current_offset, buf.len() as u32));
1045 current_offset += buf.len() as u64;
1046 }
1047
1048 if !dim_entries.is_empty() {
1049 field_tocs.push(SparseFieldToc {
1050 field_id,
1051 quantization: quantization as u8,
1052 dims: dim_entries,
1053 });
1054 }
1055 }
1056
1057 if field_tocs.is_empty() {
1058 return Ok(());
1059 }
1060
1061 let toc_offset = current_offset;
1062 write_sparse_toc_and_footer(writer, toc_offset, &field_tocs).map_err(crate::Error::Io)?;
1063
1064 Ok(())
1065 }
1066
1067 fn build_positions_streaming(
1072 position_index: HashMap<TermKey, PositionPostingListBuilder>,
1073 term_interner: &Rodeo,
1074 writer: &mut dyn Write,
1075 ) -> Result<FxHashMap<Vec<u8>, (u64, u32)>> {
1076 use crate::structures::PositionPostingList;
1077
1078 let mut position_offsets: FxHashMap<Vec<u8>, (u64, u32)> = FxHashMap::default();
1079
1080 let mut entries: Vec<(Vec<u8>, PositionPostingListBuilder)> = position_index
1082 .into_iter()
1083 .map(|(term_key, pos_builder)| {
1084 let term_str = term_interner.resolve(&term_key.term);
1085 let mut key = Vec::with_capacity(size_of::<u32>() + term_str.len());
1086 key.extend_from_slice(&term_key.field.to_le_bytes());
1087 key.extend_from_slice(term_str.as_bytes());
1088 (key, pos_builder)
1089 })
1090 .collect();
1091
1092 entries.sort_by(|a, b| a.0.cmp(&b.0));
1093
1094 let mut current_offset = 0u64;
1095 let mut buf = Vec::new();
1096
1097 for (key, pos_builder) in entries {
1098 let mut pos_list = PositionPostingList::with_capacity(pos_builder.postings.len());
1099 for (doc_id, positions) in pos_builder.postings {
1100 pos_list.push(doc_id, positions);
1101 }
1102
1103 buf.clear();
1105 pos_list.serialize(&mut buf).map_err(crate::Error::Io)?;
1106 writer.write_all(&buf)?;
1107
1108 position_offsets.insert(key, (current_offset, buf.len() as u32));
1109 current_offset += buf.len() as u64;
1110 }
1111
1112 Ok(position_offsets)
1113 }
1114
1115 fn build_postings_streaming(
1120 inverted_index: HashMap<TermKey, PostingListBuilder>,
1121 term_interner: Rodeo,
1122 position_offsets: &FxHashMap<Vec<u8>, (u64, u32)>,
1123 term_dict_writer: &mut dyn Write,
1124 postings_writer: &mut dyn Write,
1125 ) -> Result<()> {
1126 let mut term_entries: Vec<(Vec<u8>, PostingListBuilder)> = inverted_index
1128 .into_iter()
1129 .map(|(term_key, posting_list)| {
1130 let term_str = term_interner.resolve(&term_key.term);
1131 let mut key = Vec::with_capacity(4 + term_str.len());
1132 key.extend_from_slice(&term_key.field.to_le_bytes());
1133 key.extend_from_slice(term_str.as_bytes());
1134 (key, posting_list)
1135 })
1136 .collect();
1137
1138 drop(term_interner);
1139
1140 term_entries.par_sort_unstable_by(|a, b| a.0.cmp(&b.0));
1141
1142 let serialized: Vec<(Vec<u8>, SerializedPosting)> = term_entries
1146 .into_par_iter()
1147 .map(|(key, posting_builder)| {
1148 let has_positions = position_offsets.contains_key(&key);
1149
1150 if !has_positions {
1152 let doc_ids: Vec<u32> =
1153 posting_builder.postings.iter().map(|p| p.doc_id).collect();
1154 let term_freqs: Vec<u32> = posting_builder
1155 .postings
1156 .iter()
1157 .map(|p| p.term_freq as u32)
1158 .collect();
1159 if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
1160 return Ok((key, SerializedPosting::Inline(inline)));
1161 }
1162 }
1163
1164 let mut full_postings = PostingList::with_capacity(posting_builder.len());
1166 for p in &posting_builder.postings {
1167 full_postings.push(p.doc_id, p.term_freq as u32);
1168 }
1169
1170 let mut posting_bytes = Vec::new();
1171 let block_list =
1172 crate::structures::BlockPostingList::from_posting_list(&full_postings)?;
1173 block_list.serialize(&mut posting_bytes)?;
1174 let result = SerializedPosting::External {
1175 bytes: posting_bytes,
1176 doc_count: full_postings.doc_count(),
1177 };
1178
1179 Ok((key, result))
1180 })
1181 .collect::<Result<Vec<_>>>()?;
1182
1183 let mut postings_offset = 0u64;
1185 let mut writer = SSTableWriter::<_, TermInfo>::new(term_dict_writer);
1186
1187 for (key, serialized_posting) in serialized {
1188 let term_info = match serialized_posting {
1189 SerializedPosting::Inline(info) => info,
1190 SerializedPosting::External { bytes, doc_count } => {
1191 let posting_len = bytes.len() as u32;
1192 postings_writer.write_all(&bytes)?;
1193
1194 let info = if let Some(&(pos_offset, pos_len)) = position_offsets.get(&key) {
1195 TermInfo::external_with_positions(
1196 postings_offset,
1197 posting_len,
1198 doc_count,
1199 pos_offset,
1200 pos_len,
1201 )
1202 } else {
1203 TermInfo::external(postings_offset, posting_len, doc_count)
1204 };
1205 postings_offset += posting_len as u64;
1206 info
1207 }
1208 };
1209
1210 writer.insert(&key, &term_info)?;
1211 }
1212
1213 let _ = writer.finish()?;
1214 Ok(())
1215 }
1216
1217 fn build_store_streaming(
1223 store_path: &PathBuf,
1224 num_compression_threads: usize,
1225 compression_level: CompressionLevel,
1226 writer: &mut dyn Write,
1227 ) -> Result<()> {
1228 use super::store::EagerParallelStoreWriter;
1229
1230 let file = File::open(store_path)?;
1231 let mmap = unsafe { memmap2::Mmap::map(&file)? };
1232
1233 let mut store_writer = EagerParallelStoreWriter::with_compression_level(
1234 writer,
1235 num_compression_threads,
1236 compression_level,
1237 );
1238
1239 let mut offset = 0usize;
1242 while offset + 4 <= mmap.len() {
1243 let doc_len = u32::from_le_bytes([
1244 mmap[offset],
1245 mmap[offset + 1],
1246 mmap[offset + 2],
1247 mmap[offset + 3],
1248 ]) as usize;
1249 offset += 4;
1250
1251 if offset + doc_len > mmap.len() {
1252 break;
1253 }
1254
1255 let doc_bytes = &mmap[offset..offset + doc_len];
1256 store_writer.store_raw(doc_bytes)?;
1257 offset += doc_len;
1258 }
1259
1260 store_writer.finish()?;
1261 Ok(())
1262 }
1263}
1264
1265impl Drop for SegmentBuilder {
1266 fn drop(&mut self) {
1267 let _ = std::fs::remove_file(&self.store_path);
1269 }
1270}