1mod config;
12mod posting;
13mod vectors;
14
15pub use config::{MemoryBreakdown, SegmentBuilderConfig, SegmentBuilderStats};
16
17use std::fs::{File, OpenOptions};
18use std::io::{BufWriter, Write};
19use std::path::PathBuf;
20
21use hashbrown::HashMap;
22use lasso::{Rodeo, Spur};
23use rayon::prelude::*;
24use rustc_hash::FxHashMap;
25
26use crate::compression::CompressionLevel;
27
28use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
29use crate::directories::{Directory, DirectoryWriter};
30use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
31use crate::structures::{PostingList, SSTableWriter, TermInfo};
32use crate::tokenizer::BoxedTokenizer;
33use crate::{DocId, Result};
34
35use posting::{
36 CompactPosting, PositionPostingListBuilder, PostingListBuilder, SerializedPosting, TermKey,
37};
38use vectors::{DenseVectorBuilder, SparseVectorBuilder};
39
40pub use super::vector_data::{FlatVectorData, IVFRaBitQIndexData, ScaNNIndexData};
42
43const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; pub struct SegmentBuilder {
53 schema: Schema,
54 config: SegmentBuilderConfig,
55 tokenizers: FxHashMap<Field, BoxedTokenizer>,
56
57 term_interner: Rodeo,
59
60 inverted_index: HashMap<TermKey, PostingListBuilder>,
62
63 store_file: BufWriter<File>,
65 store_path: PathBuf,
66
67 next_doc_id: DocId,
69
70 field_stats: FxHashMap<u32, FieldStats>,
72
73 doc_field_lengths: Vec<u32>,
77 num_indexed_fields: usize,
78 field_to_slot: FxHashMap<u32, usize>,
79
80 local_tf_buffer: FxHashMap<Spur, u32>,
83
84 token_buffer: String,
86
87 dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
90
91 sparse_vectors: FxHashMap<u32, SparseVectorBuilder>,
94
95 position_index: HashMap<TermKey, PositionPostingListBuilder>,
98
99 position_enabled_fields: FxHashMap<u32, Option<crate::dsl::PositionMode>>,
101
102 current_element_ordinal: FxHashMap<u32, u32>,
104
105 estimated_memory: usize,
107}
108
109impl SegmentBuilder {
110 pub fn new(schema: Schema, config: SegmentBuilderConfig) -> Result<Self> {
112 let segment_id = uuid::Uuid::new_v4();
113 let store_path = config
114 .temp_dir
115 .join(format!("hermes_store_{}.tmp", segment_id));
116
117 let store_file = BufWriter::with_capacity(
118 STORE_BUFFER_SIZE,
119 OpenOptions::new()
120 .create(true)
121 .write(true)
122 .truncate(true)
123 .open(&store_path)?,
124 );
125
126 let mut num_indexed_fields = 0;
129 let mut field_to_slot = FxHashMap::default();
130 let mut position_enabled_fields = FxHashMap::default();
131 for (field, entry) in schema.fields() {
132 if entry.indexed && matches!(entry.field_type, FieldType::Text) {
133 field_to_slot.insert(field.0, num_indexed_fields);
134 num_indexed_fields += 1;
135 if entry.positions.is_some() {
136 position_enabled_fields.insert(field.0, entry.positions);
137 }
138 }
139 }
140
141 Ok(Self {
142 schema,
143 tokenizers: FxHashMap::default(),
144 term_interner: Rodeo::new(),
145 inverted_index: HashMap::with_capacity(config.posting_map_capacity),
146 store_file,
147 store_path,
148 next_doc_id: 0,
149 field_stats: FxHashMap::default(),
150 doc_field_lengths: Vec::new(),
151 num_indexed_fields,
152 field_to_slot,
153 local_tf_buffer: FxHashMap::default(),
154 token_buffer: String::with_capacity(64),
155 config,
156 dense_vectors: FxHashMap::default(),
157 sparse_vectors: FxHashMap::default(),
158 position_index: HashMap::new(),
159 position_enabled_fields,
160 current_element_ordinal: FxHashMap::default(),
161 estimated_memory: 0,
162 })
163 }
164
165 pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
166 self.tokenizers.insert(field, tokenizer);
167 }
168
169 pub fn num_docs(&self) -> u32 {
170 self.next_doc_id
171 }
172
173 #[inline]
175 pub fn estimated_memory_bytes(&self) -> usize {
176 self.estimated_memory
177 }
178
179 pub fn recalibrate_memory(&mut self) {
184 self.estimated_memory = self.stats().estimated_memory_bytes;
185 }
186
187 pub fn sparse_dim_count(&self) -> usize {
189 self.sparse_vectors.values().map(|b| b.postings.len()).sum()
190 }
191
192 pub fn stats(&self) -> SegmentBuilderStats {
194 use std::mem::size_of;
195
196 let postings_in_memory: usize =
197 self.inverted_index.values().map(|p| p.postings.len()).sum();
198
199 let compact_posting_size = size_of::<CompactPosting>();
201 let vec_overhead = size_of::<Vec<u8>>(); let term_key_size = size_of::<TermKey>();
203 let posting_builder_size = size_of::<PostingListBuilder>();
204 let spur_size = size_of::<lasso::Spur>();
205 let sparse_entry_size = size_of::<(DocId, u16, f32)>();
206
207 let hashmap_entry_base_overhead = 8usize;
210
211 let fxhashmap_entry_overhead = hashmap_entry_base_overhead;
213
214 let postings_bytes: usize = self
216 .inverted_index
217 .values()
218 .map(|p| p.postings.capacity() * compact_posting_size + vec_overhead)
219 .sum();
220
221 let index_overhead_bytes = self.inverted_index.len()
223 * (term_key_size + posting_builder_size + hashmap_entry_base_overhead);
224
225 let interner_arena_overhead = 2 * size_of::<usize>();
228 let avg_term_len = 8; let interner_bytes =
230 self.term_interner.len() * (avg_term_len + spur_size + interner_arena_overhead);
231
232 let field_lengths_bytes =
234 self.doc_field_lengths.capacity() * size_of::<u32>() + vec_overhead;
235
236 let mut dense_vectors_bytes: usize = 0;
238 let mut dense_vector_count: usize = 0;
239 let doc_id_ordinal_size = size_of::<(DocId, u16)>();
240 for b in self.dense_vectors.values() {
241 dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
242 + b.doc_ids.capacity() * doc_id_ordinal_size
243 + 2 * vec_overhead; dense_vector_count += b.doc_ids.len();
245 }
246
247 let local_tf_entry_size = spur_size + size_of::<u32>() + fxhashmap_entry_overhead;
249 let local_tf_buffer_bytes = self.local_tf_buffer.capacity() * local_tf_entry_size;
250
251 let mut sparse_vectors_bytes: usize = 0;
253 for builder in self.sparse_vectors.values() {
254 for postings in builder.postings.values() {
255 sparse_vectors_bytes += postings.capacity() * sparse_entry_size + vec_overhead;
256 }
257 let inner_entry_size = size_of::<u32>() + vec_overhead + fxhashmap_entry_overhead;
259 sparse_vectors_bytes += builder.postings.len() * inner_entry_size;
260 }
261 let outer_sparse_entry_size =
263 size_of::<u32>() + size_of::<SparseVectorBuilder>() + fxhashmap_entry_overhead;
264 sparse_vectors_bytes += self.sparse_vectors.len() * outer_sparse_entry_size;
265
266 let mut position_index_bytes: usize = 0;
268 for pos_builder in self.position_index.values() {
269 for (_, positions) in &pos_builder.postings {
270 position_index_bytes += positions.capacity() * size_of::<u32>() + vec_overhead;
271 }
272 let pos_entry_size = size_of::<DocId>() + vec_overhead;
274 position_index_bytes += pos_builder.postings.capacity() * pos_entry_size;
275 }
276 let pos_index_entry_size =
278 term_key_size + size_of::<PositionPostingListBuilder>() + hashmap_entry_base_overhead;
279 position_index_bytes += self.position_index.len() * pos_index_entry_size;
280
281 let estimated_memory_bytes = postings_bytes
282 + index_overhead_bytes
283 + interner_bytes
284 + field_lengths_bytes
285 + dense_vectors_bytes
286 + local_tf_buffer_bytes
287 + sparse_vectors_bytes
288 + position_index_bytes;
289
290 let memory_breakdown = MemoryBreakdown {
291 postings_bytes,
292 index_overhead_bytes,
293 interner_bytes,
294 field_lengths_bytes,
295 dense_vectors_bytes,
296 dense_vector_count,
297 sparse_vectors_bytes,
298 position_index_bytes,
299 };
300
301 SegmentBuilderStats {
302 num_docs: self.next_doc_id,
303 unique_terms: self.inverted_index.len(),
304 postings_in_memory,
305 interned_strings: self.term_interner.len(),
306 doc_field_lengths_size: self.doc_field_lengths.len(),
307 estimated_memory_bytes,
308 memory_breakdown,
309 }
310 }
311
312 pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
314 let doc_id = self.next_doc_id;
315 self.next_doc_id += 1;
316
317 let base_idx = self.doc_field_lengths.len();
319 self.doc_field_lengths
320 .resize(base_idx + self.num_indexed_fields, 0);
321 self.estimated_memory += self.num_indexed_fields * std::mem::size_of::<u32>();
322
323 self.current_element_ordinal.clear();
325
326 for (field, value) in doc.field_values() {
327 let entry = self.schema.get_field_entry(*field);
328 if entry.is_none() || !entry.unwrap().indexed {
329 continue;
330 }
331
332 let entry = entry.unwrap();
333 match (&entry.field_type, value) {
334 (FieldType::Text, FieldValue::Text(text)) => {
335 let element_ordinal = *self.current_element_ordinal.get(&field.0).unwrap_or(&0);
337 let token_count =
338 self.index_text_field(*field, doc_id, text, element_ordinal)?;
339 *self.current_element_ordinal.entry(field.0).or_insert(0) += 1;
341
342 let stats = self.field_stats.entry(field.0).or_default();
344 stats.total_tokens += token_count as u64;
345 if element_ordinal == 0 {
347 stats.doc_count += 1;
348 }
349
350 if let Some(&slot) = self.field_to_slot.get(&field.0) {
352 self.doc_field_lengths[base_idx + slot] = token_count;
353 }
354 }
355 (FieldType::U64, FieldValue::U64(v)) => {
356 self.index_numeric_field(*field, doc_id, *v)?;
357 }
358 (FieldType::I64, FieldValue::I64(v)) => {
359 self.index_numeric_field(*field, doc_id, *v as u64)?;
360 }
361 (FieldType::F64, FieldValue::F64(v)) => {
362 self.index_numeric_field(*field, doc_id, v.to_bits())?;
363 }
364 (FieldType::DenseVector, FieldValue::DenseVector(vec)) => {
365 let element_ordinal = *self.current_element_ordinal.get(&field.0).unwrap_or(&0);
367 self.index_dense_vector_field(*field, doc_id, element_ordinal as u16, vec)?;
368 *self.current_element_ordinal.entry(field.0).or_insert(0) += 1;
370 }
371 (FieldType::SparseVector, FieldValue::SparseVector(entries)) => {
372 let element_ordinal = *self.current_element_ordinal.get(&field.0).unwrap_or(&0);
374 self.index_sparse_vector_field(
375 *field,
376 doc_id,
377 element_ordinal as u16,
378 entries,
379 )?;
380 *self.current_element_ordinal.entry(field.0).or_insert(0) += 1;
382 }
383 _ => {}
384 }
385 }
386
387 self.write_document_to_store(&doc)?;
389
390 Ok(doc_id)
391 }
392
393 fn index_text_field(
404 &mut self,
405 field: Field,
406 doc_id: DocId,
407 text: &str,
408 element_ordinal: u32,
409 ) -> Result<u32> {
410 use crate::dsl::PositionMode;
411
412 let field_id = field.0;
413 let position_mode = self
414 .position_enabled_fields
415 .get(&field_id)
416 .copied()
417 .flatten();
418
419 self.local_tf_buffer.clear();
423
424 let mut local_positions: FxHashMap<Spur, Vec<u32>> = FxHashMap::default();
426
427 let mut token_position = 0u32;
428
429 for word in text.split_whitespace() {
431 self.token_buffer.clear();
433 for c in word.chars() {
434 if c.is_alphanumeric() {
435 for lc in c.to_lowercase() {
436 self.token_buffer.push(lc);
437 }
438 }
439 }
440
441 if self.token_buffer.is_empty() {
442 continue;
443 }
444
445 let is_new_string = !self.term_interner.contains(&self.token_buffer);
447 let term_spur = self.term_interner.get_or_intern(&self.token_buffer);
448 if is_new_string {
449 use std::mem::size_of;
450 self.estimated_memory +=
452 self.token_buffer.len() + size_of::<Spur>() + 2 * size_of::<usize>();
453 }
454 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
455
456 if let Some(mode) = position_mode {
458 let encoded_pos = match mode {
459 PositionMode::Ordinal => element_ordinal << 20,
461 PositionMode::TokenPosition => token_position,
463 PositionMode::Full => (element_ordinal << 20) | token_position,
465 };
466 local_positions
467 .entry(term_spur)
468 .or_default()
469 .push(encoded_pos);
470 }
471
472 token_position += 1;
473 }
474
475 for (&term_spur, &tf) in &self.local_tf_buffer {
478 let term_key = TermKey {
479 field: field_id,
480 term: term_spur,
481 };
482
483 let is_new_term = !self.inverted_index.contains_key(&term_key);
484 let posting = self
485 .inverted_index
486 .entry(term_key)
487 .or_insert_with(PostingListBuilder::new);
488 posting.add(doc_id, tf);
489
490 use std::mem::size_of;
492 self.estimated_memory += size_of::<CompactPosting>();
493 if is_new_term {
494 self.estimated_memory +=
496 size_of::<TermKey>() + size_of::<PostingListBuilder>() + 24;
497 }
498
499 if position_mode.is_some()
501 && let Some(positions) = local_positions.get(&term_spur)
502 {
503 let is_new_pos_term = !self.position_index.contains_key(&term_key);
504 let pos_posting = self
505 .position_index
506 .entry(term_key)
507 .or_insert_with(PositionPostingListBuilder::new);
508 for &pos in positions {
509 pos_posting.add_position(doc_id, pos);
510 }
511 self.estimated_memory += positions.len() * size_of::<u32>();
513 if is_new_pos_term {
514 self.estimated_memory +=
515 size_of::<TermKey>() + size_of::<PositionPostingListBuilder>() + 24;
516 }
517 }
518 }
519
520 Ok(token_position)
521 }
522
523 fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
524 use std::mem::size_of;
525
526 let term_str = format!("__num_{}", value);
528 let is_new_string = !self.term_interner.contains(&term_str);
529 let term_spur = self.term_interner.get_or_intern(&term_str);
530
531 let term_key = TermKey {
532 field: field.0,
533 term: term_spur,
534 };
535
536 let is_new_term = !self.inverted_index.contains_key(&term_key);
537 let posting = self
538 .inverted_index
539 .entry(term_key)
540 .or_insert_with(PostingListBuilder::new);
541 posting.add(doc_id, 1);
542
543 self.estimated_memory += size_of::<CompactPosting>();
545 if is_new_term {
546 self.estimated_memory += size_of::<TermKey>() + size_of::<PostingListBuilder>() + 24;
547 }
548 if is_new_string {
549 self.estimated_memory += term_str.len() + size_of::<Spur>() + 2 * size_of::<usize>();
550 }
551
552 Ok(())
553 }
554
555 fn index_dense_vector_field(
557 &mut self,
558 field: Field,
559 doc_id: DocId,
560 ordinal: u16,
561 vector: &[f32],
562 ) -> Result<()> {
563 let dim = vector.len();
564
565 let builder = self
566 .dense_vectors
567 .entry(field.0)
568 .or_insert_with(|| DenseVectorBuilder::new(dim));
569
570 if builder.dim != dim && builder.len() > 0 {
572 return Err(crate::Error::Schema(format!(
573 "Dense vector dimension mismatch: expected {}, got {}",
574 builder.dim, dim
575 )));
576 }
577
578 builder.add(doc_id, ordinal, vector);
579
580 use std::mem::{size_of, size_of_val};
582 self.estimated_memory += size_of_val(vector) + size_of::<(DocId, u16)>();
583
584 Ok(())
585 }
586
587 fn index_sparse_vector_field(
594 &mut self,
595 field: Field,
596 doc_id: DocId,
597 ordinal: u16,
598 entries: &[(u32, f32)],
599 ) -> Result<()> {
600 let weight_threshold = self
602 .schema
603 .get_field_entry(field)
604 .and_then(|entry| entry.sparse_vector_config.as_ref())
605 .map(|config| config.weight_threshold)
606 .unwrap_or(0.0);
607
608 let builder = self
609 .sparse_vectors
610 .entry(field.0)
611 .or_insert_with(SparseVectorBuilder::new);
612
613 for &(dim_id, weight) in entries {
614 if weight.abs() < weight_threshold {
616 continue;
617 }
618
619 use std::mem::size_of;
621 let is_new_dim = !builder.postings.contains_key(&dim_id);
622 builder.add(dim_id, doc_id, ordinal, weight);
623 self.estimated_memory += size_of::<(DocId, u16, f32)>();
624 if is_new_dim {
625 self.estimated_memory += size_of::<u32>() + size_of::<Vec<(DocId, u16, f32)>>() + 8; }
628 }
629
630 Ok(())
631 }
632
633 fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
635 use byteorder::{LittleEndian, WriteBytesExt};
636
637 let doc_bytes = super::store::serialize_document(doc, &self.schema)?;
638
639 self.store_file
640 .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
641 self.store_file.write_all(&doc_bytes)?;
642
643 Ok(())
644 }
645
646 pub async fn build<D: Directory + DirectoryWriter>(
651 mut self,
652 dir: &D,
653 segment_id: SegmentId,
654 ) -> Result<SegmentMeta> {
655 self.store_file.flush()?;
657
658 let files = SegmentFiles::new(segment_id.0);
659
660 let position_index = std::mem::take(&mut self.position_index);
662 let (positions_data, position_offsets) =
663 Self::build_positions_owned(position_index, &self.term_interner)?;
664 let inverted_index = std::mem::take(&mut self.inverted_index);
669 let term_interner = std::mem::replace(&mut self.term_interner, Rodeo::new());
670 let store_path = self.store_path.clone();
671 let schema_clone = self.schema.clone();
672 let num_compression_threads = self.config.num_compression_threads;
673 let compression_level = self.config.compression_level;
674
675 let (postings_result, store_result) = rayon::join(
676 || Self::build_postings_owned(inverted_index, term_interner, &position_offsets),
677 || {
678 Self::build_store_batched(
679 &store_path,
680 &schema_clone,
681 num_compression_threads,
682 compression_level,
683 )
684 },
685 );
686 let (term_dict_data, postings_data) = postings_result?;
689 let store_data = store_result?;
690
691 dir.write(&files.term_dict, &term_dict_data).await?;
693 drop(term_dict_data);
694 dir.write(&files.postings, &postings_data).await?;
695 drop(postings_data);
696 dir.write(&files.store, &store_data).await?;
697 drop(store_data);
698
699 if !positions_data.is_empty() {
700 dir.write(&files.positions, &positions_data).await?;
701 }
702 drop(positions_data);
703 drop(position_offsets);
704
705 let dense_vectors = std::mem::take(&mut self.dense_vectors);
707 if !dense_vectors.is_empty() {
708 let vectors_data = Self::build_vectors_file_binary(dense_vectors, &self.schema)?;
709 if !vectors_data.is_empty() {
710 dir.write(&files.vectors, &vectors_data).await?;
711 }
712 }
713 let mut sparse_vectors = std::mem::take(&mut self.sparse_vectors);
717 if !sparse_vectors.is_empty() {
718 let sparse_data = Self::build_sparse_file_inplace(&mut sparse_vectors, &self.schema)?;
719 drop(sparse_vectors);
720 if !sparse_data.is_empty() {
721 dir.write(&files.sparse, &sparse_data).await?;
722 }
723 }
724
725 let meta = SegmentMeta {
726 id: segment_id.0,
727 num_docs: self.next_doc_id,
728 field_stats: self.field_stats.clone(),
729 };
730
731 dir.write(&files.meta, &meta.serialize()?).await?;
732
733 let _ = std::fs::remove_file(&self.store_path);
735
736 Ok(meta)
737 }
738
739 fn build_vectors_file_binary(
744 dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
745 schema: &Schema,
746 ) -> Result<Vec<u8>> {
747 use byteorder::{LittleEndian, WriteBytesExt};
748
749 let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
750
751 for (field_id, builder) in dense_vectors {
752 if builder.len() == 0 {
753 continue;
754 }
755
756 let field = crate::dsl::Field(field_id);
757 let dense_config = schema
758 .get_field_entry(field)
759 .and_then(|e| e.dense_vector_config.as_ref());
760
761 let index_dim = dense_config.map(|c| c.index_dim()).unwrap_or(builder.dim);
762
763 let index_bytes = FlatVectorData::serialize_binary_from_flat(
765 index_dim,
766 &builder.vectors,
767 builder.dim,
768 &builder.doc_ids,
769 );
770
771 field_indexes.push((field_id, 4u8, index_bytes)); }
774
775 if field_indexes.is_empty() {
776 return Ok(Vec::new());
777 }
778
779 field_indexes.sort_by_key(|(id, _, _)| *id);
780 let header_size = 4 + field_indexes.len() * (4 + 1 + 8 + 8);
781
782 let mut output = Vec::new();
783 output.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
784
785 let mut current_offset = header_size as u64;
786 for (field_id, index_type, data) in &field_indexes {
787 output.write_u32::<LittleEndian>(*field_id)?;
788 output.write_u8(*index_type)?;
789 output.write_u64::<LittleEndian>(current_offset)?;
790 output.write_u64::<LittleEndian>(data.len() as u64)?;
791 current_offset += data.len() as u64;
792 }
793
794 for (_, _, data) in field_indexes {
795 output.extend_from_slice(&data);
796 }
797
798 Ok(output)
799 }
800
801 fn build_sparse_file_inplace(
806 sparse_vectors: &mut FxHashMap<u32, SparseVectorBuilder>,
807 schema: &Schema,
808 ) -> Result<Vec<u8>> {
809 use crate::structures::{BlockSparsePostingList, WeightQuantization};
810 use byteorder::{LittleEndian, WriteBytesExt};
811
812 if sparse_vectors.is_empty() {
813 return Ok(Vec::new());
814 }
815
816 type SparseFieldData = (u32, WeightQuantization, u32, FxHashMap<u32, Vec<u8>>);
817 let mut field_data: Vec<SparseFieldData> = Vec::new();
818
819 for (&field_id, builder) in sparse_vectors.iter_mut() {
820 if builder.is_empty() {
821 continue;
822 }
823
824 let field = crate::dsl::Field(field_id);
825 let sparse_config = schema
826 .get_field_entry(field)
827 .and_then(|e| e.sparse_vector_config.as_ref());
828
829 let quantization = sparse_config
830 .map(|c| c.weight_quantization)
831 .unwrap_or(WeightQuantization::Float32);
832
833 let block_size = sparse_config.map(|c| c.block_size).unwrap_or(128);
834 let pruning_fraction = sparse_config.and_then(|c| c.posting_list_pruning);
835
836 let mut dim_bytes: FxHashMap<u32, Vec<u8>> = FxHashMap::default();
837
838 for (&dim_id, postings) in builder.postings.iter_mut() {
839 postings.sort_unstable_by_key(|(doc_id, ordinal, _)| (*doc_id, *ordinal));
841
842 if let Some(fraction) = pruning_fraction
844 && postings.len() > 1
845 && fraction < 1.0
846 {
847 let original_len = postings.len();
848 postings.sort_by(|a, b| {
849 b.2.abs()
850 .partial_cmp(&a.2.abs())
851 .unwrap_or(std::cmp::Ordering::Equal)
852 });
853 let keep = ((original_len as f64 * fraction as f64).ceil() as usize).max(1);
854 postings.truncate(keep);
855 postings.sort_unstable_by_key(|(d, o, _)| (*d, *o));
856 }
857
858 let block_list = BlockSparsePostingList::from_postings_with_block_size(
859 postings,
860 quantization,
861 block_size,
862 )
863 .map_err(crate::Error::Io)?;
864
865 let mut bytes = Vec::new();
866 block_list.serialize(&mut bytes).map_err(crate::Error::Io)?;
867 dim_bytes.insert(dim_id, bytes);
868 }
869
870 field_data.push((field_id, quantization, dim_bytes.len() as u32, dim_bytes));
871 }
872
873 if field_data.is_empty() {
874 return Ok(Vec::new());
875 }
876
877 field_data.sort_by_key(|(id, _, _, _)| *id);
878
879 let mut header_size = 4u64;
881 for (_, _, num_dims, _) in &field_data {
882 header_size += 4 + 1 + 4;
883 header_size += (*num_dims as u64) * 16;
884 }
885
886 let mut output = Vec::new();
887 output.write_u32::<LittleEndian>(field_data.len() as u32)?;
888
889 let mut current_offset = header_size;
890 let mut all_data: Vec<u8> = Vec::new();
891 let mut field_tables: Vec<Vec<(u32, u64, u32)>> = Vec::new();
892
893 for (_, _, _, dim_bytes) in &field_data {
894 let mut table: Vec<(u32, u64, u32)> = Vec::with_capacity(dim_bytes.len());
895 let mut dims: Vec<_> = dim_bytes.keys().copied().collect();
896 dims.sort();
897
898 for dim_id in dims {
899 let bytes = &dim_bytes[&dim_id];
900 table.push((dim_id, current_offset, bytes.len() as u32));
901 current_offset += bytes.len() as u64;
902 all_data.extend_from_slice(bytes);
903 }
904 field_tables.push(table);
905 }
906
907 for (i, (field_id, quantization, num_dims, _)) in field_data.iter().enumerate() {
908 output.write_u32::<LittleEndian>(*field_id)?;
909 output.write_u8(*quantization as u8)?;
910 output.write_u32::<LittleEndian>(*num_dims)?;
911
912 for &(dim_id, offset, length) in &field_tables[i] {
913 output.write_u32::<LittleEndian>(dim_id)?;
914 output.write_u64::<LittleEndian>(offset)?;
915 output.write_u32::<LittleEndian>(length)?;
916 }
917 }
918
919 output.extend_from_slice(&all_data);
920 Ok(output)
921 }
922
923 #[allow(clippy::type_complexity)]
925 fn build_positions_owned(
926 position_index: HashMap<TermKey, PositionPostingListBuilder>,
927 term_interner: &Rodeo,
928 ) -> Result<(Vec<u8>, FxHashMap<Vec<u8>, (u64, u32)>)> {
929 use crate::structures::PositionPostingList;
930
931 let mut position_offsets: FxHashMap<Vec<u8>, (u64, u32)> = FxHashMap::default();
932
933 if position_index.is_empty() {
934 return Ok((Vec::new(), position_offsets));
935 }
936
937 let mut entries: Vec<(Vec<u8>, PositionPostingListBuilder)> = position_index
939 .into_iter()
940 .map(|(term_key, pos_builder)| {
941 let term_str = term_interner.resolve(&term_key.term);
942 let mut key = Vec::with_capacity(4 + term_str.len());
943 key.extend_from_slice(&term_key.field.to_le_bytes());
944 key.extend_from_slice(term_str.as_bytes());
945 (key, pos_builder)
946 })
947 .collect();
948
949 entries.sort_by(|a, b| a.0.cmp(&b.0));
950
951 let mut output = Vec::new();
952
953 for (key, pos_builder) in entries {
954 let mut pos_list = PositionPostingList::with_capacity(pos_builder.postings.len());
955 for (doc_id, positions) in pos_builder.postings {
956 pos_list.push(doc_id, positions);
958 }
959
960 let offset = output.len() as u64;
961 pos_list.serialize(&mut output).map_err(crate::Error::Io)?;
962 let len = (output.len() as u64 - offset) as u32;
963
964 position_offsets.insert(key, (offset, len));
965 }
966
967 Ok((output, position_offsets))
968 }
969
970 fn build_postings_owned(
975 inverted_index: HashMap<TermKey, PostingListBuilder>,
976 term_interner: Rodeo,
977 position_offsets: &FxHashMap<Vec<u8>, (u64, u32)>,
978 ) -> Result<(Vec<u8>, Vec<u8>)> {
979 let mut term_entries: Vec<(Vec<u8>, PostingListBuilder)> = inverted_index
981 .into_iter()
982 .map(|(term_key, posting_list)| {
983 let term_str = term_interner.resolve(&term_key.term);
984 let mut key = Vec::with_capacity(4 + term_str.len());
985 key.extend_from_slice(&term_key.field.to_le_bytes());
986 key.extend_from_slice(term_str.as_bytes());
987 (key, posting_list)
988 })
989 .collect();
990
991 drop(term_interner);
993
994 term_entries.par_sort_unstable_by(|a, b| a.0.cmp(&b.0));
995
996 let serialized: Vec<(Vec<u8>, SerializedPosting)> = term_entries
999 .into_par_iter()
1000 .map(|(key, posting_builder)| {
1001 let mut full_postings = PostingList::with_capacity(posting_builder.len());
1002 for p in &posting_builder.postings {
1003 full_postings.push(p.doc_id, p.term_freq as u32);
1004 }
1005 let doc_ids: Vec<u32> = full_postings.iter().map(|p| p.doc_id).collect();
1008 let term_freqs: Vec<u32> = full_postings.iter().map(|p| p.term_freq).collect();
1009
1010 let has_positions = position_offsets.contains_key(&key);
1011 let result = if !has_positions
1012 && let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs)
1013 {
1014 SerializedPosting::Inline(inline)
1015 } else {
1016 let mut posting_bytes = Vec::new();
1017 let block_list =
1018 crate::structures::BlockPostingList::from_posting_list(&full_postings)?;
1019 block_list.serialize(&mut posting_bytes)?;
1020 SerializedPosting::External {
1021 bytes: posting_bytes,
1022 doc_count: full_postings.doc_count(),
1023 }
1024 };
1025
1026 Ok((key, result))
1027 })
1028 .collect::<Result<Vec<_>>>()?;
1029
1030 let mut term_dict = Vec::new();
1032 let mut postings = Vec::new();
1033 let mut writer = SSTableWriter::<TermInfo>::new(&mut term_dict);
1034
1035 for (key, serialized_posting) in serialized {
1036 let term_info = match serialized_posting {
1037 SerializedPosting::Inline(info) => info,
1038 SerializedPosting::External { bytes, doc_count } => {
1039 let posting_offset = postings.len() as u64;
1040 let posting_len = bytes.len() as u32;
1041 postings.extend_from_slice(&bytes);
1042
1043 if let Some(&(pos_offset, pos_len)) = position_offsets.get(&key) {
1044 TermInfo::external_with_positions(
1045 posting_offset,
1046 posting_len,
1047 doc_count,
1048 pos_offset,
1049 pos_len,
1050 )
1051 } else {
1052 TermInfo::external(posting_offset, posting_len, doc_count)
1053 }
1054 }
1055 };
1056
1057 writer.insert(&key, &term_info)?;
1058 }
1059
1060 writer.finish()?;
1061 Ok((term_dict, postings))
1062 }
1063
1064 fn build_store_batched(
1069 store_path: &PathBuf,
1070 schema: &Schema,
1071 num_compression_threads: usize,
1072 compression_level: CompressionLevel,
1073 ) -> Result<Vec<u8>> {
1074 use super::store::EagerParallelStoreWriter;
1075
1076 let file = File::open(store_path)?;
1077 let mmap = unsafe { memmap2::Mmap::map(&file)? };
1078
1079 let mut doc_ranges: Vec<(usize, usize)> = Vec::new();
1081 let mut offset = 0usize;
1082 while offset + 4 <= mmap.len() {
1083 let doc_len = u32::from_le_bytes([
1084 mmap[offset],
1085 mmap[offset + 1],
1086 mmap[offset + 2],
1087 mmap[offset + 3],
1088 ]) as usize;
1089 offset += 4;
1090
1091 if offset + doc_len > mmap.len() {
1092 break;
1093 }
1094
1095 doc_ranges.push((offset, doc_len));
1096 offset += doc_len;
1097 }
1098
1099 const BATCH_SIZE: usize = 10_000;
1103 let mut store_data = Vec::new();
1104 let mut store_writer = EagerParallelStoreWriter::with_compression_level(
1105 &mut store_data,
1106 num_compression_threads,
1107 compression_level,
1108 );
1109
1110 for batch in doc_ranges.chunks(BATCH_SIZE) {
1111 let batch_docs: Vec<Document> = batch
1112 .par_iter()
1113 .filter_map(|&(start, len)| {
1114 let doc_bytes = &mmap[start..start + len];
1115 super::store::deserialize_document(doc_bytes, schema).ok()
1116 })
1117 .collect();
1118
1119 for doc in &batch_docs {
1120 store_writer.store(doc, schema)?;
1121 }
1122 }
1124
1125 store_writer.finish()?;
1126 Ok(store_data)
1127 }
1128}
1129
1130impl Drop for SegmentBuilder {
1131 fn drop(&mut self) {
1132 let _ = std::fs::remove_file(&self.store_path);
1134 }
1135}