1mod config;
12mod posting;
13mod vectors;
14
15pub use config::{MemoryBreakdown, SegmentBuilderConfig, SegmentBuilderStats};
16
17use std::fs::{File, OpenOptions};
18use std::io::{BufWriter, Write};
19use std::mem::size_of;
20use std::path::PathBuf;
21
22use hashbrown::HashMap;
23use lasso::{Rodeo, Spur};
24use rayon::prelude::*;
25use rustc_hash::FxHashMap;
26
27use crate::compression::CompressionLevel;
28
29use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
30use crate::directories::{Directory, DirectoryWriter};
31use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
32use crate::structures::{PostingList, SSTableWriter, TermInfo};
33use crate::tokenizer::BoxedTokenizer;
34use crate::{DocId, Result};
35
36use posting::{
37 CompactPosting, PositionPostingListBuilder, PostingListBuilder, SerializedPosting, TermKey,
38};
39use vectors::{DenseVectorBuilder, SparseVectorBuilder};
40
41pub use super::vector_data::{FlatVectorData, IVFRaBitQIndexData, ScaNNIndexData};
43
44const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; pub struct SegmentBuilder {
54 schema: Schema,
55 config: SegmentBuilderConfig,
56 tokenizers: FxHashMap<Field, BoxedTokenizer>,
57
58 term_interner: Rodeo,
60
61 inverted_index: HashMap<TermKey, PostingListBuilder>,
63
64 store_file: BufWriter<File>,
66 store_path: PathBuf,
67
68 next_doc_id: DocId,
70
71 field_stats: FxHashMap<u32, FieldStats>,
73
74 doc_field_lengths: Vec<u32>,
78 num_indexed_fields: usize,
79 field_to_slot: FxHashMap<u32, usize>,
80
81 local_tf_buffer: FxHashMap<Spur, u32>,
84
85 token_buffer: String,
87
88 dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
91
92 sparse_vectors: FxHashMap<u32, SparseVectorBuilder>,
95
96 position_index: HashMap<TermKey, PositionPostingListBuilder>,
99
100 position_enabled_fields: FxHashMap<u32, Option<crate::dsl::PositionMode>>,
102
103 current_element_ordinal: FxHashMap<u32, u32>,
105
106 estimated_memory: usize,
108}
109
110impl SegmentBuilder {
111 pub fn new(schema: Schema, config: SegmentBuilderConfig) -> Result<Self> {
113 let segment_id = uuid::Uuid::new_v4();
114 let store_path = config
115 .temp_dir
116 .join(format!("hermes_store_{}.tmp", segment_id));
117
118 let store_file = BufWriter::with_capacity(
119 STORE_BUFFER_SIZE,
120 OpenOptions::new()
121 .create(true)
122 .write(true)
123 .truncate(true)
124 .open(&store_path)?,
125 );
126
127 let mut num_indexed_fields = 0;
130 let mut field_to_slot = FxHashMap::default();
131 let mut position_enabled_fields = FxHashMap::default();
132 for (field, entry) in schema.fields() {
133 if entry.indexed && matches!(entry.field_type, FieldType::Text) {
134 field_to_slot.insert(field.0, num_indexed_fields);
135 num_indexed_fields += 1;
136 if entry.positions.is_some() {
137 position_enabled_fields.insert(field.0, entry.positions);
138 }
139 }
140 }
141
142 Ok(Self {
143 schema,
144 tokenizers: FxHashMap::default(),
145 term_interner: Rodeo::new(),
146 inverted_index: HashMap::with_capacity(config.posting_map_capacity),
147 store_file,
148 store_path,
149 next_doc_id: 0,
150 field_stats: FxHashMap::default(),
151 doc_field_lengths: Vec::new(),
152 num_indexed_fields,
153 field_to_slot,
154 local_tf_buffer: FxHashMap::default(),
155 token_buffer: String::with_capacity(64),
156 config,
157 dense_vectors: FxHashMap::default(),
158 sparse_vectors: FxHashMap::default(),
159 position_index: HashMap::new(),
160 position_enabled_fields,
161 current_element_ordinal: FxHashMap::default(),
162 estimated_memory: 0,
163 })
164 }
165
166 pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
167 self.tokenizers.insert(field, tokenizer);
168 }
169
170 pub fn num_docs(&self) -> u32 {
171 self.next_doc_id
172 }
173
174 #[inline]
176 pub fn estimated_memory_bytes(&self) -> usize {
177 self.estimated_memory
178 }
179
180 pub fn recalibrate_memory(&mut self) {
185 self.estimated_memory = self.stats().estimated_memory_bytes;
186 }
187
188 pub fn sparse_dim_count(&self) -> usize {
190 self.sparse_vectors.values().map(|b| b.postings.len()).sum()
191 }
192
193 pub fn stats(&self) -> SegmentBuilderStats {
195 use std::mem::size_of;
196
197 let postings_in_memory: usize =
198 self.inverted_index.values().map(|p| p.postings.len()).sum();
199
200 let compact_posting_size = size_of::<CompactPosting>();
202 let vec_overhead = size_of::<Vec<u8>>(); let term_key_size = size_of::<TermKey>();
204 let posting_builder_size = size_of::<PostingListBuilder>();
205 let spur_size = size_of::<lasso::Spur>();
206 let sparse_entry_size = size_of::<(DocId, u16, f32)>();
207
208 let hashmap_entry_base_overhead = 8usize;
211
212 let fxhashmap_entry_overhead = hashmap_entry_base_overhead;
214
215 let postings_bytes: usize = self
217 .inverted_index
218 .values()
219 .map(|p| p.postings.capacity() * compact_posting_size + vec_overhead)
220 .sum();
221
222 let index_overhead_bytes = self.inverted_index.len()
224 * (term_key_size + posting_builder_size + hashmap_entry_base_overhead);
225
226 let interner_arena_overhead = 2 * size_of::<usize>();
229 let avg_term_len = 8; let interner_bytes =
231 self.term_interner.len() * (avg_term_len + spur_size + interner_arena_overhead);
232
233 let field_lengths_bytes =
235 self.doc_field_lengths.capacity() * size_of::<u32>() + vec_overhead;
236
237 let mut dense_vectors_bytes: usize = 0;
239 let mut dense_vector_count: usize = 0;
240 let doc_id_ordinal_size = size_of::<(DocId, u16)>();
241 for b in self.dense_vectors.values() {
242 dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
243 + b.doc_ids.capacity() * doc_id_ordinal_size
244 + 2 * vec_overhead; dense_vector_count += b.doc_ids.len();
246 }
247
248 let local_tf_entry_size = spur_size + size_of::<u32>() + fxhashmap_entry_overhead;
250 let local_tf_buffer_bytes = self.local_tf_buffer.capacity() * local_tf_entry_size;
251
252 let mut sparse_vectors_bytes: usize = 0;
254 for builder in self.sparse_vectors.values() {
255 for postings in builder.postings.values() {
256 sparse_vectors_bytes += postings.capacity() * sparse_entry_size + vec_overhead;
257 }
258 let inner_entry_size = size_of::<u32>() + vec_overhead + fxhashmap_entry_overhead;
260 sparse_vectors_bytes += builder.postings.len() * inner_entry_size;
261 }
262 let outer_sparse_entry_size =
264 size_of::<u32>() + size_of::<SparseVectorBuilder>() + fxhashmap_entry_overhead;
265 sparse_vectors_bytes += self.sparse_vectors.len() * outer_sparse_entry_size;
266
267 let mut position_index_bytes: usize = 0;
269 for pos_builder in self.position_index.values() {
270 for (_, positions) in &pos_builder.postings {
271 position_index_bytes += positions.capacity() * size_of::<u32>() + vec_overhead;
272 }
273 let pos_entry_size = size_of::<DocId>() + vec_overhead;
275 position_index_bytes += pos_builder.postings.capacity() * pos_entry_size;
276 }
277 let pos_index_entry_size =
279 term_key_size + size_of::<PositionPostingListBuilder>() + hashmap_entry_base_overhead;
280 position_index_bytes += self.position_index.len() * pos_index_entry_size;
281
282 let estimated_memory_bytes = postings_bytes
283 + index_overhead_bytes
284 + interner_bytes
285 + field_lengths_bytes
286 + dense_vectors_bytes
287 + local_tf_buffer_bytes
288 + sparse_vectors_bytes
289 + position_index_bytes;
290
291 let memory_breakdown = MemoryBreakdown {
292 postings_bytes,
293 index_overhead_bytes,
294 interner_bytes,
295 field_lengths_bytes,
296 dense_vectors_bytes,
297 dense_vector_count,
298 sparse_vectors_bytes,
299 position_index_bytes,
300 };
301
302 SegmentBuilderStats {
303 num_docs: self.next_doc_id,
304 unique_terms: self.inverted_index.len(),
305 postings_in_memory,
306 interned_strings: self.term_interner.len(),
307 doc_field_lengths_size: self.doc_field_lengths.len(),
308 estimated_memory_bytes,
309 memory_breakdown,
310 }
311 }
312
313 pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
315 let doc_id = self.next_doc_id;
316 self.next_doc_id += 1;
317
318 let base_idx = self.doc_field_lengths.len();
320 self.doc_field_lengths
321 .resize(base_idx + self.num_indexed_fields, 0);
322 self.estimated_memory += self.num_indexed_fields * std::mem::size_of::<u32>();
323
324 self.current_element_ordinal.clear();
326
327 for (field, value) in doc.field_values() {
328 let entry = self.schema.get_field_entry(*field);
329 if entry.is_none() || !entry.unwrap().indexed {
330 continue;
331 }
332
333 let entry = entry.unwrap();
334 match (&entry.field_type, value) {
335 (FieldType::Text, FieldValue::Text(text)) => {
336 let element_ordinal = *self.current_element_ordinal.get(&field.0).unwrap_or(&0);
338 let token_count =
339 self.index_text_field(*field, doc_id, text, element_ordinal)?;
340 *self.current_element_ordinal.entry(field.0).or_insert(0) += 1;
342
343 let stats = self.field_stats.entry(field.0).or_default();
345 stats.total_tokens += token_count as u64;
346 if element_ordinal == 0 {
348 stats.doc_count += 1;
349 }
350
351 if let Some(&slot) = self.field_to_slot.get(&field.0) {
353 self.doc_field_lengths[base_idx + slot] = token_count;
354 }
355 }
356 (FieldType::U64, FieldValue::U64(v)) => {
357 self.index_numeric_field(*field, doc_id, *v)?;
358 }
359 (FieldType::I64, FieldValue::I64(v)) => {
360 self.index_numeric_field(*field, doc_id, *v as u64)?;
361 }
362 (FieldType::F64, FieldValue::F64(v)) => {
363 self.index_numeric_field(*field, doc_id, v.to_bits())?;
364 }
365 (FieldType::DenseVector, FieldValue::DenseVector(vec)) => {
366 let element_ordinal = *self.current_element_ordinal.get(&field.0).unwrap_or(&0);
368 self.index_dense_vector_field(*field, doc_id, element_ordinal as u16, vec)?;
369 *self.current_element_ordinal.entry(field.0).or_insert(0) += 1;
371 }
372 (FieldType::SparseVector, FieldValue::SparseVector(entries)) => {
373 let element_ordinal = *self.current_element_ordinal.get(&field.0).unwrap_or(&0);
375 self.index_sparse_vector_field(
376 *field,
377 doc_id,
378 element_ordinal as u16,
379 entries,
380 )?;
381 *self.current_element_ordinal.entry(field.0).or_insert(0) += 1;
383 }
384 _ => {}
385 }
386 }
387
388 self.write_document_to_store(&doc)?;
390
391 Ok(doc_id)
392 }
393
394 fn index_text_field(
405 &mut self,
406 field: Field,
407 doc_id: DocId,
408 text: &str,
409 element_ordinal: u32,
410 ) -> Result<u32> {
411 use crate::dsl::PositionMode;
412
413 let field_id = field.0;
414 let position_mode = self
415 .position_enabled_fields
416 .get(&field_id)
417 .copied()
418 .flatten();
419
420 self.local_tf_buffer.clear();
424
425 let mut local_positions: FxHashMap<Spur, Vec<u32>> = FxHashMap::default();
427
428 let mut token_position = 0u32;
429
430 for word in text.split_whitespace() {
432 self.token_buffer.clear();
434 for c in word.chars() {
435 if c.is_alphanumeric() {
436 for lc in c.to_lowercase() {
437 self.token_buffer.push(lc);
438 }
439 }
440 }
441
442 if self.token_buffer.is_empty() {
443 continue;
444 }
445
446 let is_new_string = !self.term_interner.contains(&self.token_buffer);
448 let term_spur = self.term_interner.get_or_intern(&self.token_buffer);
449 if is_new_string {
450 use std::mem::size_of;
451 self.estimated_memory +=
453 self.token_buffer.len() + size_of::<Spur>() + 2 * size_of::<usize>();
454 }
455 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
456
457 if let Some(mode) = position_mode {
459 let encoded_pos = match mode {
460 PositionMode::Ordinal => element_ordinal << 20,
462 PositionMode::TokenPosition => token_position,
464 PositionMode::Full => (element_ordinal << 20) | token_position,
466 };
467 local_positions
468 .entry(term_spur)
469 .or_default()
470 .push(encoded_pos);
471 }
472
473 token_position += 1;
474 }
475
476 for (&term_spur, &tf) in &self.local_tf_buffer {
479 let term_key = TermKey {
480 field: field_id,
481 term: term_spur,
482 };
483
484 let is_new_term = !self.inverted_index.contains_key(&term_key);
485 let posting = self
486 .inverted_index
487 .entry(term_key)
488 .or_insert_with(PostingListBuilder::new);
489 posting.add(doc_id, tf);
490
491 use std::mem::size_of;
493 self.estimated_memory += size_of::<CompactPosting>();
494 if is_new_term {
495 self.estimated_memory +=
497 size_of::<TermKey>() + size_of::<PostingListBuilder>() + 24;
498 }
499
500 if position_mode.is_some()
502 && let Some(positions) = local_positions.get(&term_spur)
503 {
504 let is_new_pos_term = !self.position_index.contains_key(&term_key);
505 let pos_posting = self
506 .position_index
507 .entry(term_key)
508 .or_insert_with(PositionPostingListBuilder::new);
509 for &pos in positions {
510 pos_posting.add_position(doc_id, pos);
511 }
512 self.estimated_memory += positions.len() * size_of::<u32>();
514 if is_new_pos_term {
515 self.estimated_memory +=
516 size_of::<TermKey>() + size_of::<PositionPostingListBuilder>() + 24;
517 }
518 }
519 }
520
521 Ok(token_position)
522 }
523
524 fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
525 use std::mem::size_of;
526
527 let term_str = format!("__num_{}", value);
529 let is_new_string = !self.term_interner.contains(&term_str);
530 let term_spur = self.term_interner.get_or_intern(&term_str);
531
532 let term_key = TermKey {
533 field: field.0,
534 term: term_spur,
535 };
536
537 let is_new_term = !self.inverted_index.contains_key(&term_key);
538 let posting = self
539 .inverted_index
540 .entry(term_key)
541 .or_insert_with(PostingListBuilder::new);
542 posting.add(doc_id, 1);
543
544 self.estimated_memory += size_of::<CompactPosting>();
546 if is_new_term {
547 self.estimated_memory += size_of::<TermKey>() + size_of::<PostingListBuilder>() + 24;
548 }
549 if is_new_string {
550 self.estimated_memory += term_str.len() + size_of::<Spur>() + 2 * size_of::<usize>();
551 }
552
553 Ok(())
554 }
555
556 fn index_dense_vector_field(
558 &mut self,
559 field: Field,
560 doc_id: DocId,
561 ordinal: u16,
562 vector: &[f32],
563 ) -> Result<()> {
564 let dim = vector.len();
565
566 let builder = self
567 .dense_vectors
568 .entry(field.0)
569 .or_insert_with(|| DenseVectorBuilder::new(dim));
570
571 if builder.dim != dim && builder.len() > 0 {
573 return Err(crate::Error::Schema(format!(
574 "Dense vector dimension mismatch: expected {}, got {}",
575 builder.dim, dim
576 )));
577 }
578
579 builder.add(doc_id, ordinal, vector);
580
581 use std::mem::{size_of, size_of_val};
583 self.estimated_memory += size_of_val(vector) + size_of::<(DocId, u16)>();
584
585 Ok(())
586 }
587
588 fn index_sparse_vector_field(
595 &mut self,
596 field: Field,
597 doc_id: DocId,
598 ordinal: u16,
599 entries: &[(u32, f32)],
600 ) -> Result<()> {
601 let weight_threshold = self
603 .schema
604 .get_field_entry(field)
605 .and_then(|entry| entry.sparse_vector_config.as_ref())
606 .map(|config| config.weight_threshold)
607 .unwrap_or(0.0);
608
609 let builder = self
610 .sparse_vectors
611 .entry(field.0)
612 .or_insert_with(SparseVectorBuilder::new);
613
614 for &(dim_id, weight) in entries {
615 if weight.abs() < weight_threshold {
617 continue;
618 }
619
620 use std::mem::size_of;
622 let is_new_dim = !builder.postings.contains_key(&dim_id);
623 builder.add(dim_id, doc_id, ordinal, weight);
624 self.estimated_memory += size_of::<(DocId, u16, f32)>();
625 if is_new_dim {
626 self.estimated_memory += size_of::<u32>() + size_of::<Vec<(DocId, u16, f32)>>() + 8; }
629 }
630
631 Ok(())
632 }
633
634 fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
636 use byteorder::{LittleEndian, WriteBytesExt};
637
638 let doc_bytes = super::store::serialize_document(doc, &self.schema)?;
639
640 self.store_file
641 .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
642 self.store_file.write_all(&doc_bytes)?;
643
644 Ok(())
645 }
646
647 pub async fn build<D: Directory + DirectoryWriter>(
653 mut self,
654 dir: &D,
655 segment_id: SegmentId,
656 ) -> Result<SegmentMeta> {
657 self.store_file.flush()?;
659
660 let files = SegmentFiles::new(segment_id.0);
661
662 let position_index = std::mem::take(&mut self.position_index);
664 let position_offsets = if !position_index.is_empty() {
665 let mut pos_writer = dir.streaming_writer(&files.positions).await?;
666 let offsets = Self::build_positions_streaming(
667 position_index,
668 &self.term_interner,
669 &mut *pos_writer,
670 )?;
671 pos_writer.finish()?;
672 offsets
673 } else {
674 FxHashMap::default()
675 };
676
677 let inverted_index = std::mem::take(&mut self.inverted_index);
679 let term_interner = std::mem::replace(&mut self.term_interner, Rodeo::new());
680 let store_path = self.store_path.clone();
681 let schema_clone = self.schema.clone();
682 let num_compression_threads = self.config.num_compression_threads;
683 let compression_level = self.config.compression_level;
684
685 let mut term_dict_writer = dir.streaming_writer(&files.term_dict).await?;
687 let mut postings_writer = dir.streaming_writer(&files.postings).await?;
688 let mut store_writer = dir.streaming_writer(&files.store).await?;
689
690 let (postings_result, store_result) = rayon::join(
691 || {
692 Self::build_postings_streaming(
693 inverted_index,
694 term_interner,
695 &position_offsets,
696 &mut *term_dict_writer,
697 &mut *postings_writer,
698 )
699 },
700 || {
701 Self::build_store_streaming(
702 &store_path,
703 &schema_clone,
704 num_compression_threads,
705 compression_level,
706 &mut *store_writer,
707 )
708 },
709 );
710 postings_result?;
711 store_result?;
712 term_dict_writer.finish()?;
713 postings_writer.finish()?;
714 store_writer.finish()?;
715 drop(position_offsets);
716
717 let dense_vectors = std::mem::take(&mut self.dense_vectors);
719 if !dense_vectors.is_empty() {
720 let mut writer = dir.streaming_writer(&files.vectors).await?;
721 Self::build_vectors_streaming(dense_vectors, &self.schema, &mut *writer)?;
722 writer.finish()?;
723 }
724
725 let mut sparse_vectors = std::mem::take(&mut self.sparse_vectors);
727 if !sparse_vectors.is_empty() {
728 let mut writer = dir.streaming_writer(&files.sparse).await?;
729 Self::build_sparse_streaming(&mut sparse_vectors, &self.schema, &mut *writer)?;
730 drop(sparse_vectors);
731 writer.finish()?;
732 }
733
734 let meta = SegmentMeta {
735 id: segment_id.0,
736 num_docs: self.next_doc_id,
737 field_stats: self.field_stats.clone(),
738 };
739
740 dir.write(&files.meta, &meta.serialize()?).await?;
741
742 let _ = std::fs::remove_file(&self.store_path);
744
745 Ok(meta)
746 }
747
748 fn build_vectors_streaming(
753 dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
754 schema: &Schema,
755 writer: &mut dyn Write,
756 ) -> Result<()> {
757 use byteorder::{LittleEndian, WriteBytesExt};
758
759 let mut fields: Vec<(u32, DenseVectorBuilder)> = dense_vectors
760 .into_iter()
761 .filter(|(_, b)| b.len() > 0)
762 .collect();
763 fields.sort_by_key(|(id, _)| *id);
764
765 if fields.is_empty() {
766 return Ok(());
767 }
768
769 let mut field_sizes: Vec<usize> = Vec::with_capacity(fields.len());
771 for (field_id, builder) in &fields {
772 let field = crate::dsl::Field(*field_id);
773 let dense_config = schema
774 .get_field_entry(field)
775 .and_then(|e| e.dense_vector_config.as_ref());
776 let index_dim = dense_config.map(|c| c.index_dim()).unwrap_or(builder.dim);
777 field_sizes.push(FlatVectorData::serialized_binary_size(
778 index_dim,
779 builder.len(),
780 ));
781 }
782
783 let per_field_entry =
786 size_of::<u32>() + size_of::<u8>() + size_of::<u64>() + size_of::<u64>();
787 let header_size = size_of::<u32>() + fields.len() * per_field_entry;
788 let mut header = Vec::with_capacity(header_size);
789 header.write_u32::<LittleEndian>(fields.len() as u32)?;
790
791 let mut current_offset = header_size as u64;
792 for (i, (field_id, _)) in fields.iter().enumerate() {
793 header.write_u32::<LittleEndian>(*field_id)?;
794 const FLAT_BINARY_INDEX_TYPE: u8 = 4;
795 header.write_u8(FLAT_BINARY_INDEX_TYPE)?;
796 header.write_u64::<LittleEndian>(current_offset)?;
797 header.write_u64::<LittleEndian>(field_sizes[i] as u64)?;
798 current_offset += field_sizes[i] as u64;
799 }
800 writer.write_all(&header)?;
801
802 for (field_id, builder) in fields {
804 let field = crate::dsl::Field(field_id);
805 let dense_config = schema
806 .get_field_entry(field)
807 .and_then(|e| e.dense_vector_config.as_ref());
808 let index_dim = dense_config.map(|c| c.index_dim()).unwrap_or(builder.dim);
809
810 FlatVectorData::serialize_binary_from_flat_streaming(
811 index_dim,
812 &builder.vectors,
813 builder.dim,
814 &builder.doc_ids,
815 writer,
816 )
817 .map_err(crate::Error::Io)?;
818 }
820
821 Ok(())
822 }
823
824 fn build_sparse_streaming(
830 sparse_vectors: &mut FxHashMap<u32, SparseVectorBuilder>,
831 schema: &Schema,
832 writer: &mut dyn Write,
833 ) -> Result<()> {
834 use crate::structures::{BlockSparsePostingList, WeightQuantization};
835 use byteorder::{LittleEndian, WriteBytesExt};
836
837 if sparse_vectors.is_empty() {
838 return Ok(());
839 }
840
841 type DimEntry = (u32, Vec<u8>);
843 type FieldEntry = (u32, WeightQuantization, Vec<DimEntry>);
844 let mut field_data: Vec<FieldEntry> = Vec::new();
845
846 for (&field_id, builder) in sparse_vectors.iter_mut() {
847 if builder.is_empty() {
848 continue;
849 }
850
851 let field = crate::dsl::Field(field_id);
852 let sparse_config = schema
853 .get_field_entry(field)
854 .and_then(|e| e.sparse_vector_config.as_ref());
855
856 let quantization = sparse_config
857 .map(|c| c.weight_quantization)
858 .unwrap_or(WeightQuantization::Float32);
859
860 let block_size = sparse_config.map(|c| c.block_size).unwrap_or(128);
861 let pruning_fraction = sparse_config.and_then(|c| c.posting_list_pruning);
862
863 let mut dims: Vec<DimEntry> = Vec::new();
864
865 for (&dim_id, postings) in builder.postings.iter_mut() {
866 postings.sort_unstable_by_key(|(doc_id, ordinal, _)| (*doc_id, *ordinal));
867
868 if let Some(fraction) = pruning_fraction
869 && postings.len() > 1
870 && fraction < 1.0
871 {
872 let original_len = postings.len();
873 postings.sort_by(|a, b| {
874 b.2.abs()
875 .partial_cmp(&a.2.abs())
876 .unwrap_or(std::cmp::Ordering::Equal)
877 });
878 let keep = ((original_len as f64 * fraction as f64).ceil() as usize).max(1);
879 postings.truncate(keep);
880 postings.sort_unstable_by_key(|(d, o, _)| (*d, *o));
881 }
882
883 let block_list = BlockSparsePostingList::from_postings_with_block_size(
884 postings,
885 quantization,
886 block_size,
887 )
888 .map_err(crate::Error::Io)?;
889
890 let mut bytes = Vec::new();
891 block_list.serialize(&mut bytes).map_err(crate::Error::Io)?;
892 dims.push((dim_id, bytes));
893 }
894
895 dims.sort_by_key(|(id, _)| *id);
896 field_data.push((field_id, quantization, dims));
897 }
898
899 if field_data.is_empty() {
900 return Ok(());
901 }
902
903 field_data.sort_by_key(|(id, _, _)| *id);
904
905 let per_dim_entry = size_of::<u32>() + size_of::<u64>() + size_of::<u32>();
909 let per_field_header = size_of::<u32>() + size_of::<u8>() + size_of::<u32>();
910 let mut header_size = size_of::<u32>() as u64;
911 for (_, _, dims) in &field_data {
912 header_size += per_field_header as u64;
913 header_size += (dims.len() as u64) * per_dim_entry as u64;
914 }
915
916 let mut header = Vec::with_capacity(header_size as usize);
917 header.write_u32::<LittleEndian>(field_data.len() as u32)?;
918
919 let mut current_offset = header_size;
920 for (field_id, quantization, dims) in &field_data {
921 header.write_u32::<LittleEndian>(*field_id)?;
922 header.write_u8(*quantization as u8)?;
923 header.write_u32::<LittleEndian>(dims.len() as u32)?;
924
925 for (dim_id, bytes) in dims {
926 header.write_u32::<LittleEndian>(*dim_id)?;
927 header.write_u64::<LittleEndian>(current_offset)?;
928 header.write_u32::<LittleEndian>(bytes.len() as u32)?;
929 current_offset += bytes.len() as u64;
930 }
931 }
932
933 writer.write_all(&header)?;
935
936 for (_, _, dims) in field_data {
937 for (_, bytes) in dims {
938 writer.write_all(&bytes)?;
939 }
941 }
942
943 Ok(())
944 }
945
946 fn build_positions_streaming(
951 position_index: HashMap<TermKey, PositionPostingListBuilder>,
952 term_interner: &Rodeo,
953 writer: &mut dyn Write,
954 ) -> Result<FxHashMap<Vec<u8>, (u64, u32)>> {
955 use crate::structures::PositionPostingList;
956
957 let mut position_offsets: FxHashMap<Vec<u8>, (u64, u32)> = FxHashMap::default();
958
959 let mut entries: Vec<(Vec<u8>, PositionPostingListBuilder)> = position_index
961 .into_iter()
962 .map(|(term_key, pos_builder)| {
963 let term_str = term_interner.resolve(&term_key.term);
964 let mut key = Vec::with_capacity(size_of::<u32>() + term_str.len());
965 key.extend_from_slice(&term_key.field.to_le_bytes());
966 key.extend_from_slice(term_str.as_bytes());
967 (key, pos_builder)
968 })
969 .collect();
970
971 entries.sort_by(|a, b| a.0.cmp(&b.0));
972
973 let mut current_offset = 0u64;
974
975 for (key, pos_builder) in entries {
976 let mut pos_list = PositionPostingList::with_capacity(pos_builder.postings.len());
977 for (doc_id, positions) in pos_builder.postings {
978 pos_list.push(doc_id, positions);
979 }
980
981 let mut buf = Vec::new();
983 pos_list.serialize(&mut buf).map_err(crate::Error::Io)?;
984 writer.write_all(&buf)?;
985
986 position_offsets.insert(key, (current_offset, buf.len() as u32));
987 current_offset += buf.len() as u64;
988 }
989
990 Ok(position_offsets)
991 }
992
993 fn build_postings_streaming(
998 inverted_index: HashMap<TermKey, PostingListBuilder>,
999 term_interner: Rodeo,
1000 position_offsets: &FxHashMap<Vec<u8>, (u64, u32)>,
1001 term_dict_writer: &mut dyn Write,
1002 postings_writer: &mut dyn Write,
1003 ) -> Result<()> {
1004 let mut term_entries: Vec<(Vec<u8>, PostingListBuilder)> = inverted_index
1006 .into_iter()
1007 .map(|(term_key, posting_list)| {
1008 let term_str = term_interner.resolve(&term_key.term);
1009 let mut key = Vec::with_capacity(4 + term_str.len());
1010 key.extend_from_slice(&term_key.field.to_le_bytes());
1011 key.extend_from_slice(term_str.as_bytes());
1012 (key, posting_list)
1013 })
1014 .collect();
1015
1016 drop(term_interner);
1017
1018 term_entries.par_sort_unstable_by(|a, b| a.0.cmp(&b.0));
1019
1020 let serialized: Vec<(Vec<u8>, SerializedPosting)> = term_entries
1022 .into_par_iter()
1023 .map(|(key, posting_builder)| {
1024 let mut full_postings = PostingList::with_capacity(posting_builder.len());
1025 for p in &posting_builder.postings {
1026 full_postings.push(p.doc_id, p.term_freq as u32);
1027 }
1028
1029 let doc_ids: Vec<u32> = full_postings.iter().map(|p| p.doc_id).collect();
1030 let term_freqs: Vec<u32> = full_postings.iter().map(|p| p.term_freq).collect();
1031
1032 let has_positions = position_offsets.contains_key(&key);
1033 let result = if !has_positions
1034 && let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs)
1035 {
1036 SerializedPosting::Inline(inline)
1037 } else {
1038 let mut posting_bytes = Vec::new();
1039 let block_list =
1040 crate::structures::BlockPostingList::from_posting_list(&full_postings)?;
1041 block_list.serialize(&mut posting_bytes)?;
1042 SerializedPosting::External {
1043 bytes: posting_bytes,
1044 doc_count: full_postings.doc_count(),
1045 }
1046 };
1047
1048 Ok((key, result))
1049 })
1050 .collect::<Result<Vec<_>>>()?;
1051
1052 let mut postings_offset = 0u64;
1054 let mut writer = SSTableWriter::<TermInfo>::new(term_dict_writer);
1055
1056 for (key, serialized_posting) in serialized {
1057 let term_info = match serialized_posting {
1058 SerializedPosting::Inline(info) => info,
1059 SerializedPosting::External { bytes, doc_count } => {
1060 let posting_len = bytes.len() as u32;
1061 postings_writer.write_all(&bytes)?;
1062
1063 let info = if let Some(&(pos_offset, pos_len)) = position_offsets.get(&key) {
1064 TermInfo::external_with_positions(
1065 postings_offset,
1066 posting_len,
1067 doc_count,
1068 pos_offset,
1069 pos_len,
1070 )
1071 } else {
1072 TermInfo::external(postings_offset, posting_len, doc_count)
1073 };
1074 postings_offset += posting_len as u64;
1075 info
1076 }
1077 };
1078
1079 writer.insert(&key, &term_info)?;
1080 }
1081
1082 writer.finish()?;
1083 Ok(())
1084 }
1085
1086 fn build_store_streaming(
1091 store_path: &PathBuf,
1092 schema: &Schema,
1093 num_compression_threads: usize,
1094 compression_level: CompressionLevel,
1095 writer: &mut dyn Write,
1096 ) -> Result<()> {
1097 use super::store::EagerParallelStoreWriter;
1098
1099 let file = File::open(store_path)?;
1100 let mmap = unsafe { memmap2::Mmap::map(&file)? };
1101
1102 let mut doc_ranges: Vec<(usize, usize)> = Vec::new();
1103 let mut offset = 0usize;
1104 while offset + 4 <= mmap.len() {
1105 let doc_len = u32::from_le_bytes([
1106 mmap[offset],
1107 mmap[offset + 1],
1108 mmap[offset + 2],
1109 mmap[offset + 3],
1110 ]) as usize;
1111 offset += 4;
1112
1113 if offset + doc_len > mmap.len() {
1114 break;
1115 }
1116
1117 doc_ranges.push((offset, doc_len));
1118 offset += doc_len;
1119 }
1120
1121 const BATCH_SIZE: usize = 10_000;
1122 let mut store_writer = EagerParallelStoreWriter::with_compression_level(
1123 writer,
1124 num_compression_threads,
1125 compression_level,
1126 );
1127
1128 for batch in doc_ranges.chunks(BATCH_SIZE) {
1129 let batch_docs: Vec<Document> = batch
1130 .par_iter()
1131 .filter_map(|&(start, len)| {
1132 let doc_bytes = &mmap[start..start + len];
1133 super::store::deserialize_document(doc_bytes, schema).ok()
1134 })
1135 .collect();
1136
1137 for doc in &batch_docs {
1138 store_writer.store(doc, schema)?;
1139 }
1140 }
1141
1142 store_writer.finish()?;
1143 Ok(())
1144 }
1145}
1146
1147impl Drop for SegmentBuilder {
1148 fn drop(&mut self) {
1149 let _ = std::fs::remove_file(&self.store_path);
1151 }
1152}