1mod config;
12mod posting;
13mod vectors;
14
15pub use config::{MemoryBreakdown, SegmentBuilderConfig, SegmentBuilderStats};
16
17use std::fs::{File, OpenOptions};
18use std::io::{BufWriter, Write};
19use std::mem::size_of;
20use std::path::PathBuf;
21
22use hashbrown::HashMap;
23use lasso::{Rodeo, Spur};
24use rayon::prelude::*;
25use rustc_hash::FxHashMap;
26
27use crate::compression::CompressionLevel;
28
29use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
30use std::sync::Arc;
31
32use crate::directories::{Directory, DirectoryWriter};
33use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
34use crate::structures::{PostingList, SSTableWriter, TermInfo};
35use crate::tokenizer::BoxedTokenizer;
36use crate::{DocId, Result};
37
38use posting::{
39 CompactPosting, PositionPostingListBuilder, PostingListBuilder, SerializedPosting, TermKey,
40};
41use vectors::{DenseVectorBuilder, SparseVectorBuilder};
42
43use super::vector_data::FlatVectorData;
44
45const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; const NEW_TERM_OVERHEAD: usize = size_of::<TermKey>() + size_of::<PostingListBuilder>() + 24;
51
52const INTERN_OVERHEAD: usize = size_of::<Spur>() + 2 * size_of::<usize>();
54
55const NEW_POS_TERM_OVERHEAD: usize =
57 size_of::<TermKey>() + size_of::<PositionPostingListBuilder>() + 24;
58
59pub struct SegmentBuilder {
66 schema: Arc<Schema>,
67 config: SegmentBuilderConfig,
68 tokenizers: FxHashMap<Field, BoxedTokenizer>,
69
70 term_interner: Rodeo,
72
73 inverted_index: HashMap<TermKey, PostingListBuilder>,
75
76 store_file: BufWriter<File>,
78 store_path: PathBuf,
79
80 next_doc_id: DocId,
82
83 field_stats: FxHashMap<u32, FieldStats>,
85
86 doc_field_lengths: Vec<u32>,
90 num_indexed_fields: usize,
91 field_to_slot: FxHashMap<u32, usize>,
92
93 local_tf_buffer: FxHashMap<Spur, u32>,
96
97 local_positions: FxHashMap<Spur, Vec<u32>>,
100
101 token_buffer: String,
103
104 numeric_buffer: String,
106
107 dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
110
111 sparse_vectors: FxHashMap<u32, SparseVectorBuilder>,
114
115 position_index: HashMap<TermKey, PositionPostingListBuilder>,
118
119 position_enabled_fields: FxHashMap<u32, Option<crate::dsl::PositionMode>>,
121
122 current_element_ordinal: FxHashMap<u32, u32>,
124
125 estimated_memory: usize,
127
128 doc_serialize_buffer: Vec<u8>,
130}
131
132impl SegmentBuilder {
133 pub fn new(schema: Arc<Schema>, config: SegmentBuilderConfig) -> Result<Self> {
135 let segment_id = uuid::Uuid::new_v4();
136 let store_path = config
137 .temp_dir
138 .join(format!("hermes_store_{}.tmp", segment_id));
139
140 let store_file = BufWriter::with_capacity(
141 STORE_BUFFER_SIZE,
142 OpenOptions::new()
143 .create(true)
144 .write(true)
145 .truncate(true)
146 .open(&store_path)?,
147 );
148
149 let mut num_indexed_fields = 0;
152 let mut field_to_slot = FxHashMap::default();
153 let mut position_enabled_fields = FxHashMap::default();
154 for (field, entry) in schema.fields() {
155 if entry.indexed && matches!(entry.field_type, FieldType::Text) {
156 field_to_slot.insert(field.0, num_indexed_fields);
157 num_indexed_fields += 1;
158 if entry.positions.is_some() {
159 position_enabled_fields.insert(field.0, entry.positions);
160 }
161 }
162 }
163
164 Ok(Self {
165 schema,
166 tokenizers: FxHashMap::default(),
167 term_interner: Rodeo::new(),
168 inverted_index: HashMap::with_capacity(config.posting_map_capacity),
169 store_file,
170 store_path,
171 next_doc_id: 0,
172 field_stats: FxHashMap::default(),
173 doc_field_lengths: Vec::new(),
174 num_indexed_fields,
175 field_to_slot,
176 local_tf_buffer: FxHashMap::default(),
177 local_positions: FxHashMap::default(),
178 token_buffer: String::with_capacity(64),
179 numeric_buffer: String::with_capacity(32),
180 config,
181 dense_vectors: FxHashMap::default(),
182 sparse_vectors: FxHashMap::default(),
183 position_index: HashMap::new(),
184 position_enabled_fields,
185 current_element_ordinal: FxHashMap::default(),
186 estimated_memory: 0,
187 doc_serialize_buffer: Vec::with_capacity(256),
188 })
189 }
190
191 pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
192 self.tokenizers.insert(field, tokenizer);
193 }
194
195 fn next_element_ordinal(&mut self, field_id: u32) -> u32 {
198 let ordinal = *self.current_element_ordinal.get(&field_id).unwrap_or(&0);
199 *self.current_element_ordinal.entry(field_id).or_insert(0) += 1;
200 ordinal
201 }
202
203 pub fn num_docs(&self) -> u32 {
204 self.next_doc_id
205 }
206
207 #[inline]
209 pub fn estimated_memory_bytes(&self) -> usize {
210 self.estimated_memory
211 }
212
213 pub fn sparse_dim_count(&self) -> usize {
215 self.sparse_vectors.values().map(|b| b.postings.len()).sum()
216 }
217
218 pub fn stats(&self) -> SegmentBuilderStats {
220 use std::mem::size_of;
221
222 let postings_in_memory: usize =
223 self.inverted_index.values().map(|p| p.postings.len()).sum();
224
225 let compact_posting_size = size_of::<CompactPosting>();
227 let vec_overhead = size_of::<Vec<u8>>(); let term_key_size = size_of::<TermKey>();
229 let posting_builder_size = size_of::<PostingListBuilder>();
230 let spur_size = size_of::<lasso::Spur>();
231 let sparse_entry_size = size_of::<(DocId, u16, f32)>();
232
233 let hashmap_entry_base_overhead = 8usize;
236
237 let fxhashmap_entry_overhead = hashmap_entry_base_overhead;
239
240 let postings_bytes: usize = self
242 .inverted_index
243 .values()
244 .map(|p| p.postings.capacity() * compact_posting_size + vec_overhead)
245 .sum();
246
247 let index_overhead_bytes = self.inverted_index.len()
249 * (term_key_size + posting_builder_size + hashmap_entry_base_overhead);
250
251 let interner_arena_overhead = 2 * size_of::<usize>();
254 let avg_term_len = 8; let interner_bytes =
256 self.term_interner.len() * (avg_term_len + spur_size + interner_arena_overhead);
257
258 let field_lengths_bytes =
260 self.doc_field_lengths.capacity() * size_of::<u32>() + vec_overhead;
261
262 let mut dense_vectors_bytes: usize = 0;
264 let mut dense_vector_count: usize = 0;
265 let doc_id_ordinal_size = size_of::<(DocId, u16)>();
266 for b in self.dense_vectors.values() {
267 dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
268 + b.doc_ids.capacity() * doc_id_ordinal_size
269 + 2 * vec_overhead; dense_vector_count += b.doc_ids.len();
271 }
272
273 let local_tf_entry_size = spur_size + size_of::<u32>() + fxhashmap_entry_overhead;
275 let local_tf_buffer_bytes = self.local_tf_buffer.capacity() * local_tf_entry_size;
276
277 let mut sparse_vectors_bytes: usize = 0;
279 for builder in self.sparse_vectors.values() {
280 for postings in builder.postings.values() {
281 sparse_vectors_bytes += postings.capacity() * sparse_entry_size + vec_overhead;
282 }
283 let inner_entry_size = size_of::<u32>() + vec_overhead + fxhashmap_entry_overhead;
285 sparse_vectors_bytes += builder.postings.len() * inner_entry_size;
286 }
287 let outer_sparse_entry_size =
289 size_of::<u32>() + size_of::<SparseVectorBuilder>() + fxhashmap_entry_overhead;
290 sparse_vectors_bytes += self.sparse_vectors.len() * outer_sparse_entry_size;
291
292 let mut position_index_bytes: usize = 0;
294 for pos_builder in self.position_index.values() {
295 for (_, positions) in &pos_builder.postings {
296 position_index_bytes += positions.capacity() * size_of::<u32>() + vec_overhead;
297 }
298 let pos_entry_size = size_of::<DocId>() + vec_overhead;
300 position_index_bytes += pos_builder.postings.capacity() * pos_entry_size;
301 }
302 let pos_index_entry_size =
304 term_key_size + size_of::<PositionPostingListBuilder>() + hashmap_entry_base_overhead;
305 position_index_bytes += self.position_index.len() * pos_index_entry_size;
306
307 let estimated_memory_bytes = postings_bytes
308 + index_overhead_bytes
309 + interner_bytes
310 + field_lengths_bytes
311 + dense_vectors_bytes
312 + local_tf_buffer_bytes
313 + sparse_vectors_bytes
314 + position_index_bytes;
315
316 let memory_breakdown = MemoryBreakdown {
317 postings_bytes,
318 index_overhead_bytes,
319 interner_bytes,
320 field_lengths_bytes,
321 dense_vectors_bytes,
322 dense_vector_count,
323 sparse_vectors_bytes,
324 position_index_bytes,
325 };
326
327 SegmentBuilderStats {
328 num_docs: self.next_doc_id,
329 unique_terms: self.inverted_index.len(),
330 postings_in_memory,
331 interned_strings: self.term_interner.len(),
332 doc_field_lengths_size: self.doc_field_lengths.len(),
333 estimated_memory_bytes,
334 memory_breakdown,
335 }
336 }
337
338 pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
340 let doc_id = self.next_doc_id;
341 self.next_doc_id += 1;
342
343 let base_idx = self.doc_field_lengths.len();
345 self.doc_field_lengths
346 .resize(base_idx + self.num_indexed_fields, 0);
347 self.estimated_memory += self.num_indexed_fields * std::mem::size_of::<u32>();
348
349 self.current_element_ordinal.clear();
351
352 for (field, value) in doc.field_values() {
353 let Some(entry) = self.schema.get_field_entry(*field) else {
354 continue;
355 };
356
357 if !matches!(&entry.field_type, FieldType::DenseVector) && !entry.indexed {
360 continue;
361 }
362
363 match (&entry.field_type, value) {
364 (FieldType::Text, FieldValue::Text(text)) => {
365 let element_ordinal = self.next_element_ordinal(field.0);
366 let token_count =
367 self.index_text_field(*field, doc_id, text, element_ordinal)?;
368
369 let stats = self.field_stats.entry(field.0).or_default();
370 stats.total_tokens += token_count as u64;
371 if element_ordinal == 0 {
372 stats.doc_count += 1;
373 }
374
375 if let Some(&slot) = self.field_to_slot.get(&field.0) {
376 self.doc_field_lengths[base_idx + slot] = token_count;
377 }
378 }
379 (FieldType::U64, FieldValue::U64(v)) => {
380 self.index_numeric_field(*field, doc_id, *v)?;
381 }
382 (FieldType::I64, FieldValue::I64(v)) => {
383 self.index_numeric_field(*field, doc_id, *v as u64)?;
384 }
385 (FieldType::F64, FieldValue::F64(v)) => {
386 self.index_numeric_field(*field, doc_id, v.to_bits())?;
387 }
388 (FieldType::DenseVector, FieldValue::DenseVector(vec))
389 if entry.indexed || entry.stored =>
390 {
391 let ordinal = self.next_element_ordinal(field.0);
392 self.index_dense_vector_field(*field, doc_id, ordinal as u16, vec)?;
393 }
394 (FieldType::SparseVector, FieldValue::SparseVector(entries)) => {
395 let ordinal = self.next_element_ordinal(field.0);
396 self.index_sparse_vector_field(*field, doc_id, ordinal as u16, entries)?;
397 }
398 _ => {}
399 }
400 }
401
402 self.write_document_to_store(&doc)?;
404
405 Ok(doc_id)
406 }
407
408 fn index_text_field(
417 &mut self,
418 field: Field,
419 doc_id: DocId,
420 text: &str,
421 element_ordinal: u32,
422 ) -> Result<u32> {
423 use crate::dsl::PositionMode;
424
425 let field_id = field.0;
426 let position_mode = self
427 .position_enabled_fields
428 .get(&field_id)
429 .copied()
430 .flatten();
431
432 self.local_tf_buffer.clear();
436 for v in self.local_positions.values_mut() {
438 v.clear();
439 }
440
441 let mut token_position = 0u32;
442
443 let custom_tokens = self.tokenizers.get(&field).map(|t| t.tokenize(text));
447
448 if let Some(tokens) = custom_tokens {
449 for token in &tokens {
451 let is_new_string = !self.term_interner.contains(&token.text);
452 let term_spur = self.term_interner.get_or_intern(&token.text);
453 if is_new_string {
454 self.estimated_memory += token.text.len() + INTERN_OVERHEAD;
455 }
456 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
457
458 if let Some(mode) = position_mode {
459 let encoded_pos = match mode {
460 PositionMode::Ordinal => element_ordinal << 20,
461 PositionMode::TokenPosition => token.position,
462 PositionMode::Full => (element_ordinal << 20) | token.position,
463 };
464 self.local_positions
465 .entry(term_spur)
466 .or_default()
467 .push(encoded_pos);
468 }
469 }
470 token_position = tokens.len() as u32;
471 } else {
472 for word in text.split_whitespace() {
474 self.token_buffer.clear();
475 for c in word.chars() {
476 if c.is_alphanumeric() {
477 for lc in c.to_lowercase() {
478 self.token_buffer.push(lc);
479 }
480 }
481 }
482
483 if self.token_buffer.is_empty() {
484 continue;
485 }
486
487 let is_new_string = !self.term_interner.contains(&self.token_buffer);
488 let term_spur = self.term_interner.get_or_intern(&self.token_buffer);
489 if is_new_string {
490 self.estimated_memory += self.token_buffer.len() + INTERN_OVERHEAD;
491 }
492 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
493
494 if let Some(mode) = position_mode {
495 let encoded_pos = match mode {
496 PositionMode::Ordinal => element_ordinal << 20,
497 PositionMode::TokenPosition => token_position,
498 PositionMode::Full => (element_ordinal << 20) | token_position,
499 };
500 self.local_positions
501 .entry(term_spur)
502 .or_default()
503 .push(encoded_pos);
504 }
505
506 token_position += 1;
507 }
508 }
509
510 for (&term_spur, &tf) in &self.local_tf_buffer {
513 let term_key = TermKey {
514 field: field_id,
515 term: term_spur,
516 };
517
518 let is_new_term = !self.inverted_index.contains_key(&term_key);
519 let posting = self
520 .inverted_index
521 .entry(term_key)
522 .or_insert_with(PostingListBuilder::new);
523 posting.add(doc_id, tf);
524
525 self.estimated_memory += size_of::<CompactPosting>();
526 if is_new_term {
527 self.estimated_memory += NEW_TERM_OVERHEAD;
528 }
529
530 if position_mode.is_some()
531 && let Some(positions) = self.local_positions.get(&term_spur)
532 {
533 let is_new_pos_term = !self.position_index.contains_key(&term_key);
534 let pos_posting = self
535 .position_index
536 .entry(term_key)
537 .or_insert_with(PositionPostingListBuilder::new);
538 for &pos in positions {
539 pos_posting.add_position(doc_id, pos);
540 }
541 self.estimated_memory += positions.len() * size_of::<u32>();
542 if is_new_pos_term {
543 self.estimated_memory += NEW_POS_TERM_OVERHEAD;
544 }
545 }
546 }
547
548 Ok(token_position)
549 }
550
551 fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
552 use std::fmt::Write;
553
554 self.numeric_buffer.clear();
555 write!(self.numeric_buffer, "__num_{}", value).unwrap();
556 let is_new_string = !self.term_interner.contains(&self.numeric_buffer);
557 let term_spur = self.term_interner.get_or_intern(&self.numeric_buffer);
558
559 let term_key = TermKey {
560 field: field.0,
561 term: term_spur,
562 };
563
564 let is_new_term = !self.inverted_index.contains_key(&term_key);
565 let posting = self
566 .inverted_index
567 .entry(term_key)
568 .or_insert_with(PostingListBuilder::new);
569 posting.add(doc_id, 1);
570
571 self.estimated_memory += size_of::<CompactPosting>();
572 if is_new_term {
573 self.estimated_memory += NEW_TERM_OVERHEAD;
574 }
575 if is_new_string {
576 self.estimated_memory += self.numeric_buffer.len() + INTERN_OVERHEAD;
577 }
578
579 Ok(())
580 }
581
582 fn index_dense_vector_field(
584 &mut self,
585 field: Field,
586 doc_id: DocId,
587 ordinal: u16,
588 vector: &[f32],
589 ) -> Result<()> {
590 let dim = vector.len();
591
592 let builder = self
593 .dense_vectors
594 .entry(field.0)
595 .or_insert_with(|| DenseVectorBuilder::new(dim));
596
597 if builder.dim != dim && builder.len() > 0 {
599 return Err(crate::Error::Schema(format!(
600 "Dense vector dimension mismatch: expected {}, got {}",
601 builder.dim, dim
602 )));
603 }
604
605 builder.add(doc_id, ordinal, vector);
606
607 self.estimated_memory += std::mem::size_of_val(vector) + size_of::<(DocId, u16)>();
608
609 Ok(())
610 }
611
612 fn index_sparse_vector_field(
619 &mut self,
620 field: Field,
621 doc_id: DocId,
622 ordinal: u16,
623 entries: &[(u32, f32)],
624 ) -> Result<()> {
625 let weight_threshold = self
627 .schema
628 .get_field_entry(field)
629 .and_then(|entry| entry.sparse_vector_config.as_ref())
630 .map(|config| config.weight_threshold)
631 .unwrap_or(0.0);
632
633 let builder = self
634 .sparse_vectors
635 .entry(field.0)
636 .or_insert_with(SparseVectorBuilder::new);
637
638 for &(dim_id, weight) in entries {
639 if weight.abs() < weight_threshold {
641 continue;
642 }
643
644 let is_new_dim = !builder.postings.contains_key(&dim_id);
645 builder.add(dim_id, doc_id, ordinal, weight);
646 self.estimated_memory += size_of::<(DocId, u16, f32)>();
647 if is_new_dim {
648 self.estimated_memory += size_of::<u32>() + size_of::<Vec<(DocId, u16, f32)>>() + 8; }
651 }
652
653 Ok(())
654 }
655
656 fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
658 use byteorder::{LittleEndian, WriteBytesExt};
659
660 super::store::serialize_document_into(doc, &self.schema, &mut self.doc_serialize_buffer)?;
661
662 self.store_file
663 .write_u32::<LittleEndian>(self.doc_serialize_buffer.len() as u32)?;
664 self.store_file.write_all(&self.doc_serialize_buffer)?;
665
666 Ok(())
667 }
668
669 pub async fn build<D: Directory + DirectoryWriter>(
675 mut self,
676 dir: &D,
677 segment_id: SegmentId,
678 trained: Option<&super::TrainedVectorStructures>,
679 ) -> Result<SegmentMeta> {
680 self.store_file.flush()?;
682
683 let files = SegmentFiles::new(segment_id.0);
684
685 let position_index = std::mem::take(&mut self.position_index);
687 let position_offsets = if !position_index.is_empty() {
688 let mut pos_writer = dir.streaming_writer(&files.positions).await?;
689 let offsets = Self::build_positions_streaming(
690 position_index,
691 &self.term_interner,
692 &mut *pos_writer,
693 )?;
694 pos_writer.finish()?;
695 offsets
696 } else {
697 FxHashMap::default()
698 };
699
700 let inverted_index = std::mem::take(&mut self.inverted_index);
703 let term_interner = std::mem::replace(&mut self.term_interner, Rodeo::new());
704 let store_path = self.store_path.clone();
705 let num_compression_threads = self.config.num_compression_threads;
706 let compression_level = self.config.compression_level;
707 let dense_vectors = std::mem::take(&mut self.dense_vectors);
708 let mut sparse_vectors = std::mem::take(&mut self.sparse_vectors);
709 let schema = &self.schema;
710
711 let mut term_dict_writer = dir.streaming_writer(&files.term_dict).await?;
713 let mut postings_writer = dir.streaming_writer(&files.postings).await?;
714 let mut store_writer = dir.streaming_writer(&files.store).await?;
715 let mut vectors_writer = if !dense_vectors.is_empty() {
716 Some(dir.streaming_writer(&files.vectors).await?)
717 } else {
718 None
719 };
720 let mut sparse_writer = if !sparse_vectors.is_empty() {
721 Some(dir.streaming_writer(&files.sparse).await?)
722 } else {
723 None
724 };
725
726 let ((postings_result, store_result), (vectors_result, sparse_result)) = rayon::join(
727 || {
728 rayon::join(
729 || {
730 Self::build_postings_streaming(
731 inverted_index,
732 term_interner,
733 &position_offsets,
734 &mut *term_dict_writer,
735 &mut *postings_writer,
736 )
737 },
738 || {
739 Self::build_store_streaming(
740 &store_path,
741 num_compression_threads,
742 compression_level,
743 &mut *store_writer,
744 )
745 },
746 )
747 },
748 || {
749 rayon::join(
750 || -> Result<()> {
751 if let Some(ref mut w) = vectors_writer {
752 Self::build_vectors_streaming(
753 dense_vectors,
754 schema,
755 trained,
756 &mut **w,
757 )?;
758 }
759 Ok(())
760 },
761 || -> Result<()> {
762 if let Some(ref mut w) = sparse_writer {
763 Self::build_sparse_streaming(&mut sparse_vectors, schema, &mut **w)?;
764 }
765 Ok(())
766 },
767 )
768 },
769 );
770 postings_result?;
771 store_result?;
772 vectors_result?;
773 sparse_result?;
774 term_dict_writer.finish()?;
775 postings_writer.finish()?;
776 store_writer.finish()?;
777 if let Some(w) = vectors_writer {
778 w.finish()?;
779 }
780 if let Some(w) = sparse_writer {
781 w.finish()?;
782 }
783 drop(position_offsets);
784 drop(sparse_vectors);
785
786 let meta = SegmentMeta {
787 id: segment_id.0,
788 num_docs: self.next_doc_id,
789 field_stats: self.field_stats.clone(),
790 };
791
792 dir.write(&files.meta, &meta.serialize()?).await?;
793
794 let _ = std::fs::remove_file(&self.store_path);
796
797 Ok(meta)
798 }
799
800 fn build_vectors_streaming(
805 dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
806 schema: &Schema,
807 trained: Option<&super::TrainedVectorStructures>,
808 writer: &mut dyn Write,
809 ) -> Result<()> {
810 use crate::dsl::{DenseVectorQuantization, VectorIndexType};
811
812 let mut fields: Vec<(u32, DenseVectorBuilder)> = dense_vectors
813 .into_iter()
814 .filter(|(_, b)| b.len() > 0)
815 .collect();
816 fields.sort_by_key(|(id, _)| *id);
817
818 if fields.is_empty() {
819 return Ok(());
820 }
821
822 let quants: Vec<DenseVectorQuantization> = fields
824 .iter()
825 .map(|(field_id, _)| {
826 schema
827 .get_field_entry(Field(*field_id))
828 .and_then(|e| e.dense_vector_config.as_ref())
829 .map(|c| c.quantization)
830 .unwrap_or(DenseVectorQuantization::F32)
831 })
832 .collect();
833
834 let mut field_sizes: Vec<usize> = Vec::with_capacity(fields.len());
836 for (i, (_field_id, builder)) in fields.iter().enumerate() {
837 field_sizes.push(FlatVectorData::serialized_binary_size(
838 builder.dim,
839 builder.len(),
840 quants[i],
841 ));
842 }
843
844 use crate::segment::format::{DenseVectorTocEntry, write_dense_toc_and_footer};
845
846 let mut toc: Vec<DenseVectorTocEntry> = Vec::with_capacity(fields.len() * 2);
849 let mut current_offset = 0u64;
850
851 let ann_blobs: Vec<(u32, u8, Vec<u8>)> = if let Some(trained) = trained {
854 fields
855 .par_iter()
856 .filter_map(|(field_id, builder)| {
857 let config = schema
858 .get_field_entry(Field(*field_id))
859 .and_then(|e| e.dense_vector_config.as_ref())?;
860
861 let dim = builder.dim;
862 let blob = match config.index_type {
863 VectorIndexType::IvfRaBitQ if trained.centroids.contains_key(field_id) => {
864 let centroids = &trained.centroids[field_id];
865 let (mut index, codebook) =
866 super::ann_build::new_ivf_rabitq(dim, centroids);
867 for (i, (doc_id, ordinal)) in builder.doc_ids.iter().enumerate() {
868 let v = &builder.vectors[i * dim..(i + 1) * dim];
869 index.add_vector(centroids, &codebook, *doc_id, *ordinal, v);
870 }
871 super::ann_build::serialize_ivf_rabitq(index, codebook)
872 .map(|b| (super::ann_build::IVF_RABITQ_TYPE, b))
873 }
874 VectorIndexType::ScaNN
875 if trained.centroids.contains_key(field_id)
876 && trained.codebooks.contains_key(field_id) =>
877 {
878 let centroids = &trained.centroids[field_id];
879 let codebook = &trained.codebooks[field_id];
880 let mut index = super::ann_build::new_scann(dim, centroids, codebook);
881 for (i, (doc_id, ordinal)) in builder.doc_ids.iter().enumerate() {
882 let v = &builder.vectors[i * dim..(i + 1) * dim];
883 index.add_vector(centroids, codebook, *doc_id, *ordinal, v);
884 }
885 super::ann_build::serialize_scann(index, codebook)
886 .map(|b| (super::ann_build::SCANN_TYPE, b))
887 }
888 _ => return None,
889 };
890 match blob {
891 Ok((index_type, bytes)) => {
892 log::info!(
893 "[segment_build] built ANN(type={}) for field {} ({} vectors, {} bytes)",
894 index_type,
895 field_id,
896 builder.doc_ids.len(),
897 bytes.len()
898 );
899 Some((*field_id, index_type, bytes))
900 }
901 Err(e) => {
902 log::warn!(
903 "[segment_build] ANN serialize failed for field {}: {}",
904 field_id,
905 e
906 );
907 None
908 }
909 }
910 })
911 .collect()
912 } else {
913 Vec::new()
914 };
915
916 for (i, (_field_id, builder)) in fields.into_iter().enumerate() {
918 let data_offset = current_offset;
919 FlatVectorData::serialize_binary_from_flat_streaming(
920 builder.dim,
921 &builder.vectors,
922 &builder.doc_ids,
923 quants[i],
924 writer,
925 )
926 .map_err(crate::Error::Io)?;
927 current_offset += field_sizes[i] as u64;
928 toc.push(DenseVectorTocEntry {
929 field_id: _field_id,
930 index_type: super::ann_build::FLAT_TYPE,
931 offset: data_offset,
932 size: field_sizes[i] as u64,
933 });
934 let pad = (8 - (current_offset % 8)) % 8;
936 if pad > 0 {
937 writer.write_all(&[0u8; 8][..pad as usize])?;
938 current_offset += pad;
939 }
940 }
942
943 for (field_id, index_type, blob) in ann_blobs {
945 let data_offset = current_offset;
946 let blob_len = blob.len() as u64;
947 writer.write_all(&blob)?;
948 current_offset += blob_len;
949 toc.push(DenseVectorTocEntry {
950 field_id,
951 index_type,
952 offset: data_offset,
953 size: blob_len,
954 });
955 let pad = (8 - (current_offset % 8)) % 8;
956 if pad > 0 {
957 writer.write_all(&[0u8; 8][..pad as usize])?;
958 current_offset += pad;
959 }
960 }
961
962 write_dense_toc_and_footer(writer, current_offset, &toc)?;
964
965 Ok(())
966 }
967
968 fn build_sparse_streaming(
973 sparse_vectors: &mut FxHashMap<u32, SparseVectorBuilder>,
974 schema: &Schema,
975 writer: &mut dyn Write,
976 ) -> Result<()> {
977 use crate::segment::format::{
978 SparseDimTocEntry, SparseFieldToc, write_sparse_toc_and_footer,
979 };
980 use crate::structures::{BlockSparsePostingList, SparseSkipEntry, WeightQuantization};
981
982 if sparse_vectors.is_empty() {
983 return Ok(());
984 }
985
986 let mut field_ids: Vec<u32> = sparse_vectors.keys().copied().collect();
988 field_ids.sort_unstable();
989
990 let mut field_tocs: Vec<SparseFieldToc> = Vec::new();
991 let mut all_skip_entries: Vec<SparseSkipEntry> = Vec::new();
992 let mut current_offset = 0u64;
993
994 for &field_id in &field_ids {
995 let builder = sparse_vectors.get_mut(&field_id).unwrap();
996 if builder.is_empty() {
997 continue;
998 }
999
1000 let field = crate::dsl::Field(field_id);
1001 let sparse_config = schema
1002 .get_field_entry(field)
1003 .and_then(|e| e.sparse_vector_config.as_ref());
1004
1005 let quantization = sparse_config
1006 .map(|c| c.weight_quantization)
1007 .unwrap_or(WeightQuantization::Float32);
1008
1009 let block_size = sparse_config.map(|c| c.block_size).unwrap_or(128);
1010 let pruning_fraction = sparse_config.and_then(|c| c.posting_list_pruning);
1011
1012 let mut dims: Vec<_> = std::mem::take(&mut builder.postings).into_iter().collect();
1014 dims.sort_unstable_by_key(|(id, _)| *id);
1015
1016 let serialized_dims: Vec<(u32, u32, Vec<u8>, Vec<SparseSkipEntry>)> = dims
1017 .into_par_iter()
1018 .map(|(dim_id, mut postings)| {
1019 postings.sort_unstable_by_key(|(doc_id, ordinal, _)| (*doc_id, *ordinal));
1020
1021 if let Some(fraction) = pruning_fraction
1022 && postings.len() > 1
1023 && fraction < 1.0
1024 {
1025 let original_len = postings.len();
1026 postings.sort_by(|a, b| {
1027 b.2.abs()
1028 .partial_cmp(&a.2.abs())
1029 .unwrap_or(std::cmp::Ordering::Equal)
1030 });
1031 let keep = ((original_len as f64 * fraction as f64).ceil() as usize).max(1);
1032 postings.truncate(keep);
1033 postings.sort_unstable_by_key(|(d, o, _)| (*d, *o));
1034 }
1035
1036 let block_list = BlockSparsePostingList::from_postings_with_block_size(
1037 &postings,
1038 quantization,
1039 block_size,
1040 )
1041 .map_err(crate::Error::Io)?;
1042
1043 let doc_count = block_list.doc_count;
1044 let (block_data, skip_entries) =
1045 block_list.serialize_v3().map_err(crate::Error::Io)?;
1046 Ok((dim_id, doc_count, block_data, skip_entries))
1047 })
1048 .collect::<Result<Vec<_>>>()?;
1049
1050 let mut dim_toc_entries: Vec<SparseDimTocEntry> =
1052 Vec::with_capacity(serialized_dims.len());
1053 for (dim_id, doc_count, block_data, skip_entries) in &serialized_dims {
1054 let block_data_offset = current_offset as u32;
1055 let skip_start = all_skip_entries.len() as u32;
1056 let num_blocks = skip_entries.len() as u32;
1057 let max_weight = skip_entries
1058 .iter()
1059 .map(|e| e.max_weight)
1060 .fold(0.0f32, f32::max);
1061
1062 writer.write_all(block_data)?;
1063 current_offset += block_data.len() as u64;
1064
1065 all_skip_entries.extend_from_slice(skip_entries);
1066
1067 dim_toc_entries.push(SparseDimTocEntry {
1068 dim_id: *dim_id,
1069 block_data_offset,
1070 skip_start,
1071 num_blocks,
1072 doc_count: *doc_count,
1073 max_weight,
1074 });
1075 }
1076
1077 if !dim_toc_entries.is_empty() {
1078 field_tocs.push(SparseFieldToc {
1079 field_id,
1080 quantization: quantization as u8,
1081 dims: dim_toc_entries,
1082 });
1083 }
1084 }
1085
1086 if field_tocs.is_empty() {
1087 return Ok(());
1088 }
1089
1090 let skip_offset = current_offset;
1092 for entry in &all_skip_entries {
1093 entry.write(writer).map_err(crate::Error::Io)?;
1094 }
1095 current_offset += (all_skip_entries.len() * SparseSkipEntry::SIZE) as u64;
1096
1097 let toc_offset = current_offset;
1099 write_sparse_toc_and_footer(writer, skip_offset, toc_offset, &field_tocs)
1100 .map_err(crate::Error::Io)?;
1101
1102 Ok(())
1103 }
1104
1105 fn build_positions_streaming(
1110 position_index: HashMap<TermKey, PositionPostingListBuilder>,
1111 term_interner: &Rodeo,
1112 writer: &mut dyn Write,
1113 ) -> Result<FxHashMap<Vec<u8>, (u64, u32)>> {
1114 use crate::structures::PositionPostingList;
1115
1116 let mut position_offsets: FxHashMap<Vec<u8>, (u64, u32)> = FxHashMap::default();
1117
1118 let mut entries: Vec<(Vec<u8>, PositionPostingListBuilder)> = position_index
1120 .into_iter()
1121 .map(|(term_key, pos_builder)| {
1122 let term_str = term_interner.resolve(&term_key.term);
1123 let mut key = Vec::with_capacity(size_of::<u32>() + term_str.len());
1124 key.extend_from_slice(&term_key.field.to_le_bytes());
1125 key.extend_from_slice(term_str.as_bytes());
1126 (key, pos_builder)
1127 })
1128 .collect();
1129
1130 entries.sort_by(|a, b| a.0.cmp(&b.0));
1131
1132 let mut current_offset = 0u64;
1133 let mut buf = Vec::new();
1134
1135 for (key, pos_builder) in entries {
1136 let mut pos_list = PositionPostingList::with_capacity(pos_builder.postings.len());
1137 for (doc_id, positions) in pos_builder.postings {
1138 pos_list.push(doc_id, positions);
1139 }
1140
1141 buf.clear();
1143 pos_list.serialize(&mut buf).map_err(crate::Error::Io)?;
1144 writer.write_all(&buf)?;
1145
1146 position_offsets.insert(key, (current_offset, buf.len() as u32));
1147 current_offset += buf.len() as u64;
1148 }
1149
1150 Ok(position_offsets)
1151 }
1152
1153 fn build_postings_streaming(
1158 inverted_index: HashMap<TermKey, PostingListBuilder>,
1159 term_interner: Rodeo,
1160 position_offsets: &FxHashMap<Vec<u8>, (u64, u32)>,
1161 term_dict_writer: &mut dyn Write,
1162 postings_writer: &mut dyn Write,
1163 ) -> Result<()> {
1164 let mut term_entries: Vec<(Vec<u8>, PostingListBuilder)> = inverted_index
1166 .into_iter()
1167 .map(|(term_key, posting_list)| {
1168 let term_str = term_interner.resolve(&term_key.term);
1169 let mut key = Vec::with_capacity(4 + term_str.len());
1170 key.extend_from_slice(&term_key.field.to_le_bytes());
1171 key.extend_from_slice(term_str.as_bytes());
1172 (key, posting_list)
1173 })
1174 .collect();
1175
1176 drop(term_interner);
1177
1178 term_entries.par_sort_unstable_by(|a, b| a.0.cmp(&b.0));
1179
1180 let serialized: Vec<(Vec<u8>, SerializedPosting)> = term_entries
1184 .into_par_iter()
1185 .map(|(key, posting_builder)| {
1186 let has_positions = position_offsets.contains_key(&key);
1187
1188 if !has_positions
1191 && let Some(inline) = TermInfo::try_inline_iter(
1192 posting_builder.postings.len(),
1193 posting_builder
1194 .postings
1195 .iter()
1196 .map(|p| (p.doc_id, p.term_freq as u32)),
1197 )
1198 {
1199 return Ok((key, SerializedPosting::Inline(inline)));
1200 }
1201
1202 let mut full_postings = PostingList::with_capacity(posting_builder.len());
1204 for p in &posting_builder.postings {
1205 full_postings.push(p.doc_id, p.term_freq as u32);
1206 }
1207
1208 let mut posting_bytes = Vec::new();
1209 let block_list =
1210 crate::structures::BlockPostingList::from_posting_list(&full_postings)?;
1211 block_list.serialize(&mut posting_bytes)?;
1212 let result = SerializedPosting::External {
1213 bytes: posting_bytes,
1214 doc_count: full_postings.doc_count(),
1215 };
1216
1217 Ok((key, result))
1218 })
1219 .collect::<Result<Vec<_>>>()?;
1220
1221 let mut postings_offset = 0u64;
1223 let mut writer = SSTableWriter::<_, TermInfo>::new(term_dict_writer);
1224
1225 for (key, serialized_posting) in serialized {
1226 let term_info = match serialized_posting {
1227 SerializedPosting::Inline(info) => info,
1228 SerializedPosting::External { bytes, doc_count } => {
1229 let posting_len = bytes.len() as u32;
1230 postings_writer.write_all(&bytes)?;
1231
1232 let info = if let Some(&(pos_offset, pos_len)) = position_offsets.get(&key) {
1233 TermInfo::external_with_positions(
1234 postings_offset,
1235 posting_len,
1236 doc_count,
1237 pos_offset,
1238 pos_len,
1239 )
1240 } else {
1241 TermInfo::external(postings_offset, posting_len, doc_count)
1242 };
1243 postings_offset += posting_len as u64;
1244 info
1245 }
1246 };
1247
1248 writer.insert(&key, &term_info)?;
1249 }
1250
1251 let _ = writer.finish()?;
1252 Ok(())
1253 }
1254
1255 fn build_store_streaming(
1261 store_path: &PathBuf,
1262 num_compression_threads: usize,
1263 compression_level: CompressionLevel,
1264 writer: &mut dyn Write,
1265 ) -> Result<()> {
1266 use super::store::EagerParallelStoreWriter;
1267
1268 let file = File::open(store_path)?;
1269 let mmap = unsafe { memmap2::Mmap::map(&file)? };
1270
1271 let mut store_writer = EagerParallelStoreWriter::with_compression_level(
1272 writer,
1273 num_compression_threads,
1274 compression_level,
1275 );
1276
1277 let mut offset = 0usize;
1280 while offset + 4 <= mmap.len() {
1281 let doc_len = u32::from_le_bytes([
1282 mmap[offset],
1283 mmap[offset + 1],
1284 mmap[offset + 2],
1285 mmap[offset + 3],
1286 ]) as usize;
1287 offset += 4;
1288
1289 if offset + doc_len > mmap.len() {
1290 break;
1291 }
1292
1293 let doc_bytes = &mmap[offset..offset + doc_len];
1294 store_writer.store_raw(doc_bytes)?;
1295 offset += doc_len;
1296 }
1297
1298 store_writer.finish()?;
1299 Ok(())
1300 }
1301}
1302
1303impl Drop for SegmentBuilder {
1304 fn drop(&mut self) {
1305 let _ = std::fs::remove_file(&self.store_path);
1307 }
1308}