1mod config;
12mod posting;
13mod vectors;
14
15pub use config::{MemoryBreakdown, SegmentBuilderConfig, SegmentBuilderStats};
16
17use std::fs::{File, OpenOptions};
18use std::io::{BufWriter, Write};
19use std::mem::size_of;
20use std::path::PathBuf;
21
22use hashbrown::HashMap;
23use lasso::{Rodeo, Spur};
24use rayon::prelude::*;
25use rustc_hash::FxHashMap;
26
27use crate::compression::CompressionLevel;
28
29use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
30use crate::directories::{Directory, DirectoryWriter};
31use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
32use crate::structures::{PostingList, SSTableWriter, TermInfo};
33use crate::tokenizer::BoxedTokenizer;
34use crate::{DocId, Result};
35
36use posting::{
37 CompactPosting, PositionPostingListBuilder, PostingListBuilder, SerializedPosting, TermKey,
38};
39use vectors::{DenseVectorBuilder, SparseVectorBuilder};
40
41use super::vector_data::FlatVectorData;
42
43const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; pub struct SegmentBuilder {
53 schema: Schema,
54 config: SegmentBuilderConfig,
55 tokenizers: FxHashMap<Field, BoxedTokenizer>,
56
57 term_interner: Rodeo,
59
60 inverted_index: HashMap<TermKey, PostingListBuilder>,
62
63 store_file: BufWriter<File>,
65 store_path: PathBuf,
66
67 next_doc_id: DocId,
69
70 field_stats: FxHashMap<u32, FieldStats>,
72
73 doc_field_lengths: Vec<u32>,
77 num_indexed_fields: usize,
78 field_to_slot: FxHashMap<u32, usize>,
79
80 local_tf_buffer: FxHashMap<Spur, u32>,
83
84 token_buffer: String,
86
87 dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
90
91 sparse_vectors: FxHashMap<u32, SparseVectorBuilder>,
94
95 position_index: HashMap<TermKey, PositionPostingListBuilder>,
98
99 position_enabled_fields: FxHashMap<u32, Option<crate::dsl::PositionMode>>,
101
102 current_element_ordinal: FxHashMap<u32, u32>,
104
105 estimated_memory: usize,
107}
108
109impl SegmentBuilder {
110 pub fn new(schema: Schema, config: SegmentBuilderConfig) -> Result<Self> {
112 let segment_id = uuid::Uuid::new_v4();
113 let store_path = config
114 .temp_dir
115 .join(format!("hermes_store_{}.tmp", segment_id));
116
117 let store_file = BufWriter::with_capacity(
118 STORE_BUFFER_SIZE,
119 OpenOptions::new()
120 .create(true)
121 .write(true)
122 .truncate(true)
123 .open(&store_path)?,
124 );
125
126 let mut num_indexed_fields = 0;
129 let mut field_to_slot = FxHashMap::default();
130 let mut position_enabled_fields = FxHashMap::default();
131 for (field, entry) in schema.fields() {
132 if entry.indexed && matches!(entry.field_type, FieldType::Text) {
133 field_to_slot.insert(field.0, num_indexed_fields);
134 num_indexed_fields += 1;
135 if entry.positions.is_some() {
136 position_enabled_fields.insert(field.0, entry.positions);
137 }
138 }
139 }
140
141 Ok(Self {
142 schema,
143 tokenizers: FxHashMap::default(),
144 term_interner: Rodeo::new(),
145 inverted_index: HashMap::with_capacity(config.posting_map_capacity),
146 store_file,
147 store_path,
148 next_doc_id: 0,
149 field_stats: FxHashMap::default(),
150 doc_field_lengths: Vec::new(),
151 num_indexed_fields,
152 field_to_slot,
153 local_tf_buffer: FxHashMap::default(),
154 token_buffer: String::with_capacity(64),
155 config,
156 dense_vectors: FxHashMap::default(),
157 sparse_vectors: FxHashMap::default(),
158 position_index: HashMap::new(),
159 position_enabled_fields,
160 current_element_ordinal: FxHashMap::default(),
161 estimated_memory: 0,
162 })
163 }
164
165 pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
166 self.tokenizers.insert(field, tokenizer);
167 }
168
169 pub fn num_docs(&self) -> u32 {
170 self.next_doc_id
171 }
172
173 #[inline]
175 pub fn estimated_memory_bytes(&self) -> usize {
176 self.estimated_memory
177 }
178
179 pub fn recalibrate_memory(&mut self) {
184 self.estimated_memory = self.stats().estimated_memory_bytes;
185 }
186
187 pub fn sparse_dim_count(&self) -> usize {
189 self.sparse_vectors.values().map(|b| b.postings.len()).sum()
190 }
191
192 pub fn stats(&self) -> SegmentBuilderStats {
194 use std::mem::size_of;
195
196 let postings_in_memory: usize =
197 self.inverted_index.values().map(|p| p.postings.len()).sum();
198
199 let compact_posting_size = size_of::<CompactPosting>();
201 let vec_overhead = size_of::<Vec<u8>>(); let term_key_size = size_of::<TermKey>();
203 let posting_builder_size = size_of::<PostingListBuilder>();
204 let spur_size = size_of::<lasso::Spur>();
205 let sparse_entry_size = size_of::<(DocId, u16, f32)>();
206
207 let hashmap_entry_base_overhead = 8usize;
210
211 let fxhashmap_entry_overhead = hashmap_entry_base_overhead;
213
214 let postings_bytes: usize = self
216 .inverted_index
217 .values()
218 .map(|p| p.postings.capacity() * compact_posting_size + vec_overhead)
219 .sum();
220
221 let index_overhead_bytes = self.inverted_index.len()
223 * (term_key_size + posting_builder_size + hashmap_entry_base_overhead);
224
225 let interner_arena_overhead = 2 * size_of::<usize>();
228 let avg_term_len = 8; let interner_bytes =
230 self.term_interner.len() * (avg_term_len + spur_size + interner_arena_overhead);
231
232 let field_lengths_bytes =
234 self.doc_field_lengths.capacity() * size_of::<u32>() + vec_overhead;
235
236 let mut dense_vectors_bytes: usize = 0;
238 let mut dense_vector_count: usize = 0;
239 let doc_id_ordinal_size = size_of::<(DocId, u16)>();
240 for b in self.dense_vectors.values() {
241 dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
242 + b.doc_ids.capacity() * doc_id_ordinal_size
243 + 2 * vec_overhead; dense_vector_count += b.doc_ids.len();
245 }
246
247 let local_tf_entry_size = spur_size + size_of::<u32>() + fxhashmap_entry_overhead;
249 let local_tf_buffer_bytes = self.local_tf_buffer.capacity() * local_tf_entry_size;
250
251 let mut sparse_vectors_bytes: usize = 0;
253 for builder in self.sparse_vectors.values() {
254 for postings in builder.postings.values() {
255 sparse_vectors_bytes += postings.capacity() * sparse_entry_size + vec_overhead;
256 }
257 let inner_entry_size = size_of::<u32>() + vec_overhead + fxhashmap_entry_overhead;
259 sparse_vectors_bytes += builder.postings.len() * inner_entry_size;
260 }
261 let outer_sparse_entry_size =
263 size_of::<u32>() + size_of::<SparseVectorBuilder>() + fxhashmap_entry_overhead;
264 sparse_vectors_bytes += self.sparse_vectors.len() * outer_sparse_entry_size;
265
266 let mut position_index_bytes: usize = 0;
268 for pos_builder in self.position_index.values() {
269 for (_, positions) in &pos_builder.postings {
270 position_index_bytes += positions.capacity() * size_of::<u32>() + vec_overhead;
271 }
272 let pos_entry_size = size_of::<DocId>() + vec_overhead;
274 position_index_bytes += pos_builder.postings.capacity() * pos_entry_size;
275 }
276 let pos_index_entry_size =
278 term_key_size + size_of::<PositionPostingListBuilder>() + hashmap_entry_base_overhead;
279 position_index_bytes += self.position_index.len() * pos_index_entry_size;
280
281 let estimated_memory_bytes = postings_bytes
282 + index_overhead_bytes
283 + interner_bytes
284 + field_lengths_bytes
285 + dense_vectors_bytes
286 + local_tf_buffer_bytes
287 + sparse_vectors_bytes
288 + position_index_bytes;
289
290 let memory_breakdown = MemoryBreakdown {
291 postings_bytes,
292 index_overhead_bytes,
293 interner_bytes,
294 field_lengths_bytes,
295 dense_vectors_bytes,
296 dense_vector_count,
297 sparse_vectors_bytes,
298 position_index_bytes,
299 };
300
301 SegmentBuilderStats {
302 num_docs: self.next_doc_id,
303 unique_terms: self.inverted_index.len(),
304 postings_in_memory,
305 interned_strings: self.term_interner.len(),
306 doc_field_lengths_size: self.doc_field_lengths.len(),
307 estimated_memory_bytes,
308 memory_breakdown,
309 }
310 }
311
312 pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
314 let doc_id = self.next_doc_id;
315 self.next_doc_id += 1;
316
317 let base_idx = self.doc_field_lengths.len();
319 self.doc_field_lengths
320 .resize(base_idx + self.num_indexed_fields, 0);
321 self.estimated_memory += self.num_indexed_fields * std::mem::size_of::<u32>();
322
323 self.current_element_ordinal.clear();
325
326 for (field, value) in doc.field_values() {
327 let entry = self.schema.get_field_entry(*field);
328 if entry.is_none() {
329 continue;
330 }
331
332 let entry = entry.unwrap();
333 let dominated_by_index = matches!(&entry.field_type, FieldType::DenseVector);
336 if !dominated_by_index && !entry.indexed {
337 continue;
338 }
339
340 match (&entry.field_type, value) {
341 (FieldType::Text, FieldValue::Text(text)) => {
342 let element_ordinal = *self.current_element_ordinal.get(&field.0).unwrap_or(&0);
344 let token_count =
345 self.index_text_field(*field, doc_id, text, element_ordinal)?;
346 *self.current_element_ordinal.entry(field.0).or_insert(0) += 1;
348
349 let stats = self.field_stats.entry(field.0).or_default();
351 stats.total_tokens += token_count as u64;
352 if element_ordinal == 0 {
354 stats.doc_count += 1;
355 }
356
357 if let Some(&slot) = self.field_to_slot.get(&field.0) {
359 self.doc_field_lengths[base_idx + slot] = token_count;
360 }
361 }
362 (FieldType::U64, FieldValue::U64(v)) => {
363 self.index_numeric_field(*field, doc_id, *v)?;
364 }
365 (FieldType::I64, FieldValue::I64(v)) => {
366 self.index_numeric_field(*field, doc_id, *v as u64)?;
367 }
368 (FieldType::F64, FieldValue::F64(v)) => {
369 self.index_numeric_field(*field, doc_id, v.to_bits())?;
370 }
371 (FieldType::DenseVector, FieldValue::DenseVector(vec))
372 if entry.indexed || entry.stored =>
373 {
374 let element_ordinal = *self.current_element_ordinal.get(&field.0).unwrap_or(&0);
376 self.index_dense_vector_field(*field, doc_id, element_ordinal as u16, vec)?;
377 *self.current_element_ordinal.entry(field.0).or_insert(0) += 1;
379 }
380 (FieldType::SparseVector, FieldValue::SparseVector(entries)) => {
381 let element_ordinal = *self.current_element_ordinal.get(&field.0).unwrap_or(&0);
383 self.index_sparse_vector_field(
384 *field,
385 doc_id,
386 element_ordinal as u16,
387 entries,
388 )?;
389 *self.current_element_ordinal.entry(field.0).or_insert(0) += 1;
391 }
392 _ => {}
393 }
394 }
395
396 self.write_document_to_store(&doc)?;
398
399 Ok(doc_id)
400 }
401
402 fn index_text_field(
413 &mut self,
414 field: Field,
415 doc_id: DocId,
416 text: &str,
417 element_ordinal: u32,
418 ) -> Result<u32> {
419 use crate::dsl::PositionMode;
420
421 let field_id = field.0;
422 let position_mode = self
423 .position_enabled_fields
424 .get(&field_id)
425 .copied()
426 .flatten();
427
428 self.local_tf_buffer.clear();
432
433 let mut local_positions: FxHashMap<Spur, Vec<u32>> = FxHashMap::default();
435
436 let mut token_position = 0u32;
437
438 for word in text.split_whitespace() {
440 self.token_buffer.clear();
442 for c in word.chars() {
443 if c.is_alphanumeric() {
444 for lc in c.to_lowercase() {
445 self.token_buffer.push(lc);
446 }
447 }
448 }
449
450 if self.token_buffer.is_empty() {
451 continue;
452 }
453
454 let is_new_string = !self.term_interner.contains(&self.token_buffer);
456 let term_spur = self.term_interner.get_or_intern(&self.token_buffer);
457 if is_new_string {
458 use std::mem::size_of;
459 self.estimated_memory +=
461 self.token_buffer.len() + size_of::<Spur>() + 2 * size_of::<usize>();
462 }
463 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
464
465 if let Some(mode) = position_mode {
467 let encoded_pos = match mode {
468 PositionMode::Ordinal => element_ordinal << 20,
470 PositionMode::TokenPosition => token_position,
472 PositionMode::Full => (element_ordinal << 20) | token_position,
474 };
475 local_positions
476 .entry(term_spur)
477 .or_default()
478 .push(encoded_pos);
479 }
480
481 token_position += 1;
482 }
483
484 for (&term_spur, &tf) in &self.local_tf_buffer {
487 let term_key = TermKey {
488 field: field_id,
489 term: term_spur,
490 };
491
492 let is_new_term = !self.inverted_index.contains_key(&term_key);
493 let posting = self
494 .inverted_index
495 .entry(term_key)
496 .or_insert_with(PostingListBuilder::new);
497 posting.add(doc_id, tf);
498
499 use std::mem::size_of;
501 self.estimated_memory += size_of::<CompactPosting>();
502 if is_new_term {
503 self.estimated_memory +=
505 size_of::<TermKey>() + size_of::<PostingListBuilder>() + 24;
506 }
507
508 if position_mode.is_some()
510 && let Some(positions) = local_positions.get(&term_spur)
511 {
512 let is_new_pos_term = !self.position_index.contains_key(&term_key);
513 let pos_posting = self
514 .position_index
515 .entry(term_key)
516 .or_insert_with(PositionPostingListBuilder::new);
517 for &pos in positions {
518 pos_posting.add_position(doc_id, pos);
519 }
520 self.estimated_memory += positions.len() * size_of::<u32>();
522 if is_new_pos_term {
523 self.estimated_memory +=
524 size_of::<TermKey>() + size_of::<PositionPostingListBuilder>() + 24;
525 }
526 }
527 }
528
529 Ok(token_position)
530 }
531
532 fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
533 use std::mem::size_of;
534
535 let term_str = format!("__num_{}", value);
537 let is_new_string = !self.term_interner.contains(&term_str);
538 let term_spur = self.term_interner.get_or_intern(&term_str);
539
540 let term_key = TermKey {
541 field: field.0,
542 term: term_spur,
543 };
544
545 let is_new_term = !self.inverted_index.contains_key(&term_key);
546 let posting = self
547 .inverted_index
548 .entry(term_key)
549 .or_insert_with(PostingListBuilder::new);
550 posting.add(doc_id, 1);
551
552 self.estimated_memory += size_of::<CompactPosting>();
554 if is_new_term {
555 self.estimated_memory += size_of::<TermKey>() + size_of::<PostingListBuilder>() + 24;
556 }
557 if is_new_string {
558 self.estimated_memory += term_str.len() + size_of::<Spur>() + 2 * size_of::<usize>();
559 }
560
561 Ok(())
562 }
563
564 fn index_dense_vector_field(
566 &mut self,
567 field: Field,
568 doc_id: DocId,
569 ordinal: u16,
570 vector: &[f32],
571 ) -> Result<()> {
572 let dim = vector.len();
573
574 let builder = self
575 .dense_vectors
576 .entry(field.0)
577 .or_insert_with(|| DenseVectorBuilder::new(dim));
578
579 if builder.dim != dim && builder.len() > 0 {
581 return Err(crate::Error::Schema(format!(
582 "Dense vector dimension mismatch: expected {}, got {}",
583 builder.dim, dim
584 )));
585 }
586
587 builder.add(doc_id, ordinal, vector);
588
589 use std::mem::{size_of, size_of_val};
591 self.estimated_memory += size_of_val(vector) + size_of::<(DocId, u16)>();
592
593 Ok(())
594 }
595
596 fn index_sparse_vector_field(
603 &mut self,
604 field: Field,
605 doc_id: DocId,
606 ordinal: u16,
607 entries: &[(u32, f32)],
608 ) -> Result<()> {
609 let weight_threshold = self
611 .schema
612 .get_field_entry(field)
613 .and_then(|entry| entry.sparse_vector_config.as_ref())
614 .map(|config| config.weight_threshold)
615 .unwrap_or(0.0);
616
617 let builder = self
618 .sparse_vectors
619 .entry(field.0)
620 .or_insert_with(SparseVectorBuilder::new);
621
622 for &(dim_id, weight) in entries {
623 if weight.abs() < weight_threshold {
625 continue;
626 }
627
628 use std::mem::size_of;
630 let is_new_dim = !builder.postings.contains_key(&dim_id);
631 builder.add(dim_id, doc_id, ordinal, weight);
632 self.estimated_memory += size_of::<(DocId, u16, f32)>();
633 if is_new_dim {
634 self.estimated_memory += size_of::<u32>() + size_of::<Vec<(DocId, u16, f32)>>() + 8; }
637 }
638
639 Ok(())
640 }
641
642 fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
644 use byteorder::{LittleEndian, WriteBytesExt};
645
646 let doc_bytes = super::store::serialize_document(doc, &self.schema)?;
647
648 self.store_file
649 .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
650 self.store_file.write_all(&doc_bytes)?;
651
652 Ok(())
653 }
654
655 pub async fn build<D: Directory + DirectoryWriter>(
661 mut self,
662 dir: &D,
663 segment_id: SegmentId,
664 trained: Option<&super::TrainedVectorStructures>,
665 ) -> Result<SegmentMeta> {
666 self.store_file.flush()?;
668
669 let files = SegmentFiles::new(segment_id.0);
670
671 let position_index = std::mem::take(&mut self.position_index);
673 let position_offsets = if !position_index.is_empty() {
674 let mut pos_writer = dir.streaming_writer(&files.positions).await?;
675 let offsets = Self::build_positions_streaming(
676 position_index,
677 &self.term_interner,
678 &mut *pos_writer,
679 )?;
680 pos_writer.finish()?;
681 offsets
682 } else {
683 FxHashMap::default()
684 };
685
686 let inverted_index = std::mem::take(&mut self.inverted_index);
688 let term_interner = std::mem::replace(&mut self.term_interner, Rodeo::new());
689 let store_path = self.store_path.clone();
690 let schema_clone = self.schema.clone();
691 let num_compression_threads = self.config.num_compression_threads;
692 let compression_level = self.config.compression_level;
693
694 let mut term_dict_writer = dir.streaming_writer(&files.term_dict).await?;
696 let mut postings_writer = dir.streaming_writer(&files.postings).await?;
697 let mut store_writer = dir.streaming_writer(&files.store).await?;
698
699 let (postings_result, store_result) = rayon::join(
700 || {
701 Self::build_postings_streaming(
702 inverted_index,
703 term_interner,
704 &position_offsets,
705 &mut *term_dict_writer,
706 &mut *postings_writer,
707 )
708 },
709 || {
710 Self::build_store_streaming(
711 &store_path,
712 &schema_clone,
713 num_compression_threads,
714 compression_level,
715 &mut *store_writer,
716 )
717 },
718 );
719 postings_result?;
720 store_result?;
721 term_dict_writer.finish()?;
722 postings_writer.finish()?;
723 store_writer.finish()?;
724 drop(position_offsets);
725
726 let dense_vectors = std::mem::take(&mut self.dense_vectors);
728 if !dense_vectors.is_empty() {
729 let mut writer = dir.streaming_writer(&files.vectors).await?;
730 Self::build_vectors_streaming(dense_vectors, &self.schema, trained, &mut *writer)?;
731 writer.finish()?;
732 }
733
734 let mut sparse_vectors = std::mem::take(&mut self.sparse_vectors);
736 if !sparse_vectors.is_empty() {
737 let mut writer = dir.streaming_writer(&files.sparse).await?;
738 Self::build_sparse_streaming(&mut sparse_vectors, &self.schema, &mut *writer)?;
739 drop(sparse_vectors);
740 writer.finish()?;
741 }
742
743 let meta = SegmentMeta {
744 id: segment_id.0,
745 num_docs: self.next_doc_id,
746 field_stats: self.field_stats.clone(),
747 };
748
749 dir.write(&files.meta, &meta.serialize()?).await?;
750
751 let _ = std::fs::remove_file(&self.store_path);
753
754 Ok(meta)
755 }
756
757 fn build_vectors_streaming(
762 dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
763 schema: &Schema,
764 trained: Option<&super::TrainedVectorStructures>,
765 writer: &mut dyn Write,
766 ) -> Result<()> {
767 use crate::dsl::{DenseVectorQuantization, VectorIndexType};
768 use byteorder::{LittleEndian, WriteBytesExt};
769
770 let mut fields: Vec<(u32, DenseVectorBuilder)> = dense_vectors
771 .into_iter()
772 .filter(|(_, b)| b.len() > 0)
773 .collect();
774 fields.sort_by_key(|(id, _)| *id);
775
776 if fields.is_empty() {
777 return Ok(());
778 }
779
780 let quants: Vec<DenseVectorQuantization> = fields
782 .iter()
783 .map(|(field_id, _)| {
784 schema
785 .get_field_entry(Field(*field_id))
786 .and_then(|e| e.dense_vector_config.as_ref())
787 .map(|c| c.quantization)
788 .unwrap_or(DenseVectorQuantization::F32)
789 })
790 .collect();
791
792 let mut field_sizes: Vec<usize> = Vec::with_capacity(fields.len());
794 for (i, (_field_id, builder)) in fields.iter().enumerate() {
795 field_sizes.push(FlatVectorData::serialized_binary_size(
796 builder.dim,
797 builder.len(),
798 quants[i],
799 ));
800 }
801
802 const VECTORS_FOOTER_MAGIC: u32 = 0x32434556;
804
805 struct TocEntry {
808 field_id: u32,
809 index_type: u8,
810 offset: u64,
811 size: u64,
812 }
813 let mut toc: Vec<TocEntry> = Vec::with_capacity(fields.len() * 2);
814 let mut current_offset = 0u64;
815
816 let mut ann_blobs: Vec<(u32, u8, Vec<u8>)> = Vec::new();
819 if let Some(trained) = trained {
820 for (field_id, builder) in &fields {
821 let config = schema
822 .get_field_entry(Field(*field_id))
823 .and_then(|e| e.dense_vector_config.as_ref());
824
825 if let Some(config) = config {
826 let dim = builder.dim;
827 let blob = match config.index_type {
828 VectorIndexType::IvfRaBitQ if trained.centroids.contains_key(field_id) => {
829 let centroids = &trained.centroids[field_id];
830 let (mut index, codebook) =
831 super::ann_build::new_ivf_rabitq(dim, centroids);
832 for (i, (doc_id, ordinal)) in builder.doc_ids.iter().enumerate() {
833 let v = &builder.vectors[i * dim..(i + 1) * dim];
834 index.add_vector(centroids, &codebook, *doc_id, *ordinal, v);
835 }
836 super::ann_build::serialize_ivf_rabitq(index, codebook)
837 .map(|b| (super::ann_build::IVF_RABITQ_TYPE, b))
838 }
839 VectorIndexType::ScaNN
840 if trained.centroids.contains_key(field_id)
841 && trained.codebooks.contains_key(field_id) =>
842 {
843 let centroids = &trained.centroids[field_id];
844 let codebook = &trained.codebooks[field_id];
845 let mut index = super::ann_build::new_scann(dim, centroids, codebook);
846 for (i, (doc_id, ordinal)) in builder.doc_ids.iter().enumerate() {
847 let v = &builder.vectors[i * dim..(i + 1) * dim];
848 index.add_vector(centroids, codebook, *doc_id, *ordinal, v);
849 }
850 super::ann_build::serialize_scann(index, codebook)
851 .map(|b| (super::ann_build::SCANN_TYPE, b))
852 }
853 _ => continue,
854 };
855 match blob {
856 Ok((index_type, bytes)) => {
857 log::info!(
858 "[segment_build] built ANN(type={}) for field {} ({} vectors, {} bytes)",
859 index_type,
860 field_id,
861 builder.doc_ids.len(),
862 bytes.len()
863 );
864 ann_blobs.push((*field_id, index_type, bytes));
865 }
866 Err(e) => {
867 log::warn!(
868 "[segment_build] ANN serialize failed for field {}: {}",
869 field_id,
870 e
871 );
872 }
873 }
874 }
875 }
876 }
877
878 for (i, (_field_id, builder)) in fields.into_iter().enumerate() {
880 let data_offset = current_offset;
881 FlatVectorData::serialize_binary_from_flat_streaming(
882 builder.dim,
883 &builder.vectors,
884 &builder.doc_ids,
885 quants[i],
886 writer,
887 )
888 .map_err(crate::Error::Io)?;
889 current_offset += field_sizes[i] as u64;
890 toc.push(TocEntry {
891 field_id: _field_id,
892 index_type: super::ann_build::FLAT_TYPE,
893 offset: data_offset,
894 size: field_sizes[i] as u64,
895 });
896 let pad = (8 - (current_offset % 8)) % 8;
898 if pad > 0 {
899 writer.write_all(&[0u8; 8][..pad as usize])?;
900 current_offset += pad;
901 }
902 }
904
905 for (field_id, index_type, blob) in ann_blobs {
907 let data_offset = current_offset;
908 let blob_len = blob.len() as u64;
909 writer.write_all(&blob)?;
910 current_offset += blob_len;
911 toc.push(TocEntry {
912 field_id,
913 index_type,
914 offset: data_offset,
915 size: blob_len,
916 });
917 let pad = (8 - (current_offset % 8)) % 8;
918 if pad > 0 {
919 writer.write_all(&[0u8; 8][..pad as usize])?;
920 current_offset += pad;
921 }
922 }
923
924 let toc_offset = current_offset;
926 for entry in &toc {
927 writer.write_u32::<LittleEndian>(entry.field_id)?;
928 writer.write_u8(entry.index_type)?;
929 writer.write_u64::<LittleEndian>(entry.offset)?;
930 writer.write_u64::<LittleEndian>(entry.size)?;
931 }
932
933 writer.write_u64::<LittleEndian>(toc_offset)?;
935 writer.write_u32::<LittleEndian>(toc.len() as u32)?;
936 writer.write_u32::<LittleEndian>(VECTORS_FOOTER_MAGIC)?;
937
938 Ok(())
939 }
940
941 fn build_sparse_streaming(
947 sparse_vectors: &mut FxHashMap<u32, SparseVectorBuilder>,
948 schema: &Schema,
949 writer: &mut dyn Write,
950 ) -> Result<()> {
951 use crate::structures::{BlockSparsePostingList, WeightQuantization};
952 use byteorder::{LittleEndian, WriteBytesExt};
953
954 if sparse_vectors.is_empty() {
955 return Ok(());
956 }
957
958 type DimEntry = (u32, Vec<u8>);
960 type FieldEntry = (u32, WeightQuantization, Vec<DimEntry>);
961 let mut field_data: Vec<FieldEntry> = Vec::new();
962
963 for (&field_id, builder) in sparse_vectors.iter_mut() {
964 if builder.is_empty() {
965 continue;
966 }
967
968 let field = crate::dsl::Field(field_id);
969 let sparse_config = schema
970 .get_field_entry(field)
971 .and_then(|e| e.sparse_vector_config.as_ref());
972
973 let quantization = sparse_config
974 .map(|c| c.weight_quantization)
975 .unwrap_or(WeightQuantization::Float32);
976
977 let block_size = sparse_config.map(|c| c.block_size).unwrap_or(128);
978 let pruning_fraction = sparse_config.and_then(|c| c.posting_list_pruning);
979
980 let mut dims: Vec<DimEntry> = Vec::new();
981
982 for (&dim_id, postings) in builder.postings.iter_mut() {
983 postings.sort_unstable_by_key(|(doc_id, ordinal, _)| (*doc_id, *ordinal));
984
985 if let Some(fraction) = pruning_fraction
986 && postings.len() > 1
987 && fraction < 1.0
988 {
989 let original_len = postings.len();
990 postings.sort_by(|a, b| {
991 b.2.abs()
992 .partial_cmp(&a.2.abs())
993 .unwrap_or(std::cmp::Ordering::Equal)
994 });
995 let keep = ((original_len as f64 * fraction as f64).ceil() as usize).max(1);
996 postings.truncate(keep);
997 postings.sort_unstable_by_key(|(d, o, _)| (*d, *o));
998 }
999
1000 let block_list = BlockSparsePostingList::from_postings_with_block_size(
1001 postings,
1002 quantization,
1003 block_size,
1004 )
1005 .map_err(crate::Error::Io)?;
1006
1007 let mut bytes = Vec::new();
1008 block_list.serialize(&mut bytes).map_err(crate::Error::Io)?;
1009 dims.push((dim_id, bytes));
1010 }
1011
1012 dims.sort_by_key(|(id, _)| *id);
1013 field_data.push((field_id, quantization, dims));
1014 }
1015
1016 if field_data.is_empty() {
1017 return Ok(());
1018 }
1019
1020 field_data.sort_by_key(|(id, _, _)| *id);
1021
1022 let per_dim_entry = size_of::<u32>() + size_of::<u64>() + size_of::<u32>();
1026 let per_field_header = size_of::<u32>() + size_of::<u8>() + size_of::<u32>();
1027 let mut header_size = size_of::<u32>() as u64;
1028 for (_, _, dims) in &field_data {
1029 header_size += per_field_header as u64;
1030 header_size += (dims.len() as u64) * per_dim_entry as u64;
1031 }
1032
1033 let mut header = Vec::with_capacity(header_size as usize);
1034 header.write_u32::<LittleEndian>(field_data.len() as u32)?;
1035
1036 let mut current_offset = header_size;
1037 for (field_id, quantization, dims) in &field_data {
1038 header.write_u32::<LittleEndian>(*field_id)?;
1039 header.write_u8(*quantization as u8)?;
1040 header.write_u32::<LittleEndian>(dims.len() as u32)?;
1041
1042 for (dim_id, bytes) in dims {
1043 header.write_u32::<LittleEndian>(*dim_id)?;
1044 header.write_u64::<LittleEndian>(current_offset)?;
1045 header.write_u32::<LittleEndian>(bytes.len() as u32)?;
1046 current_offset += bytes.len() as u64;
1047 }
1048 }
1049
1050 writer.write_all(&header)?;
1052
1053 for (_, _, dims) in field_data {
1054 for (_, bytes) in dims {
1055 writer.write_all(&bytes)?;
1056 }
1058 }
1059
1060 Ok(())
1061 }
1062
1063 fn build_positions_streaming(
1068 position_index: HashMap<TermKey, PositionPostingListBuilder>,
1069 term_interner: &Rodeo,
1070 writer: &mut dyn Write,
1071 ) -> Result<FxHashMap<Vec<u8>, (u64, u32)>> {
1072 use crate::structures::PositionPostingList;
1073
1074 let mut position_offsets: FxHashMap<Vec<u8>, (u64, u32)> = FxHashMap::default();
1075
1076 let mut entries: Vec<(Vec<u8>, PositionPostingListBuilder)> = position_index
1078 .into_iter()
1079 .map(|(term_key, pos_builder)| {
1080 let term_str = term_interner.resolve(&term_key.term);
1081 let mut key = Vec::with_capacity(size_of::<u32>() + term_str.len());
1082 key.extend_from_slice(&term_key.field.to_le_bytes());
1083 key.extend_from_slice(term_str.as_bytes());
1084 (key, pos_builder)
1085 })
1086 .collect();
1087
1088 entries.sort_by(|a, b| a.0.cmp(&b.0));
1089
1090 let mut current_offset = 0u64;
1091
1092 for (key, pos_builder) in entries {
1093 let mut pos_list = PositionPostingList::with_capacity(pos_builder.postings.len());
1094 for (doc_id, positions) in pos_builder.postings {
1095 pos_list.push(doc_id, positions);
1096 }
1097
1098 let mut buf = Vec::new();
1100 pos_list.serialize(&mut buf).map_err(crate::Error::Io)?;
1101 writer.write_all(&buf)?;
1102
1103 position_offsets.insert(key, (current_offset, buf.len() as u32));
1104 current_offset += buf.len() as u64;
1105 }
1106
1107 Ok(position_offsets)
1108 }
1109
1110 fn build_postings_streaming(
1115 inverted_index: HashMap<TermKey, PostingListBuilder>,
1116 term_interner: Rodeo,
1117 position_offsets: &FxHashMap<Vec<u8>, (u64, u32)>,
1118 term_dict_writer: &mut dyn Write,
1119 postings_writer: &mut dyn Write,
1120 ) -> Result<()> {
1121 let mut term_entries: Vec<(Vec<u8>, PostingListBuilder)> = inverted_index
1123 .into_iter()
1124 .map(|(term_key, posting_list)| {
1125 let term_str = term_interner.resolve(&term_key.term);
1126 let mut key = Vec::with_capacity(4 + term_str.len());
1127 key.extend_from_slice(&term_key.field.to_le_bytes());
1128 key.extend_from_slice(term_str.as_bytes());
1129 (key, posting_list)
1130 })
1131 .collect();
1132
1133 drop(term_interner);
1134
1135 term_entries.par_sort_unstable_by(|a, b| a.0.cmp(&b.0));
1136
1137 let serialized: Vec<(Vec<u8>, SerializedPosting)> = term_entries
1139 .into_par_iter()
1140 .map(|(key, posting_builder)| {
1141 let mut full_postings = PostingList::with_capacity(posting_builder.len());
1142 for p in &posting_builder.postings {
1143 full_postings.push(p.doc_id, p.term_freq as u32);
1144 }
1145
1146 let doc_ids: Vec<u32> = full_postings.iter().map(|p| p.doc_id).collect();
1147 let term_freqs: Vec<u32> = full_postings.iter().map(|p| p.term_freq).collect();
1148
1149 let has_positions = position_offsets.contains_key(&key);
1150 let result = if !has_positions
1151 && let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs)
1152 {
1153 SerializedPosting::Inline(inline)
1154 } else {
1155 let mut posting_bytes = Vec::new();
1156 let block_list =
1157 crate::structures::BlockPostingList::from_posting_list(&full_postings)?;
1158 block_list.serialize(&mut posting_bytes)?;
1159 SerializedPosting::External {
1160 bytes: posting_bytes,
1161 doc_count: full_postings.doc_count(),
1162 }
1163 };
1164
1165 Ok((key, result))
1166 })
1167 .collect::<Result<Vec<_>>>()?;
1168
1169 let mut postings_offset = 0u64;
1171 let mut writer = SSTableWriter::<TermInfo>::new(term_dict_writer);
1172
1173 for (key, serialized_posting) in serialized {
1174 let term_info = match serialized_posting {
1175 SerializedPosting::Inline(info) => info,
1176 SerializedPosting::External { bytes, doc_count } => {
1177 let posting_len = bytes.len() as u32;
1178 postings_writer.write_all(&bytes)?;
1179
1180 let info = if let Some(&(pos_offset, pos_len)) = position_offsets.get(&key) {
1181 TermInfo::external_with_positions(
1182 postings_offset,
1183 posting_len,
1184 doc_count,
1185 pos_offset,
1186 pos_len,
1187 )
1188 } else {
1189 TermInfo::external(postings_offset, posting_len, doc_count)
1190 };
1191 postings_offset += posting_len as u64;
1192 info
1193 }
1194 };
1195
1196 writer.insert(&key, &term_info)?;
1197 }
1198
1199 writer.finish()?;
1200 Ok(())
1201 }
1202
1203 fn build_store_streaming(
1208 store_path: &PathBuf,
1209 schema: &Schema,
1210 num_compression_threads: usize,
1211 compression_level: CompressionLevel,
1212 writer: &mut dyn Write,
1213 ) -> Result<()> {
1214 use super::store::EagerParallelStoreWriter;
1215
1216 let file = File::open(store_path)?;
1217 let mmap = unsafe { memmap2::Mmap::map(&file)? };
1218
1219 let mut doc_ranges: Vec<(usize, usize)> = Vec::new();
1220 let mut offset = 0usize;
1221 while offset + 4 <= mmap.len() {
1222 let doc_len = u32::from_le_bytes([
1223 mmap[offset],
1224 mmap[offset + 1],
1225 mmap[offset + 2],
1226 mmap[offset + 3],
1227 ]) as usize;
1228 offset += 4;
1229
1230 if offset + doc_len > mmap.len() {
1231 break;
1232 }
1233
1234 doc_ranges.push((offset, doc_len));
1235 offset += doc_len;
1236 }
1237
1238 const BATCH_SIZE: usize = 10_000;
1239 let mut store_writer = EagerParallelStoreWriter::with_compression_level(
1240 writer,
1241 num_compression_threads,
1242 compression_level,
1243 );
1244
1245 for batch in doc_ranges.chunks(BATCH_SIZE) {
1246 let batch_docs: Vec<Document> = batch
1247 .par_iter()
1248 .filter_map(|&(start, len)| {
1249 let doc_bytes = &mmap[start..start + len];
1250 super::store::deserialize_document(doc_bytes, schema).ok()
1251 })
1252 .collect();
1253
1254 for doc in &batch_docs {
1255 store_writer.store(doc, schema)?;
1256 }
1257 }
1258
1259 store_writer.finish()?;
1260 Ok(())
1261 }
1262}
1263
1264impl Drop for SegmentBuilder {
1265 fn drop(&mut self) {
1266 let _ = std::fs::remove_file(&self.store_path);
1268 }
1269}