1use std::fs::{File, OpenOptions};
12use std::io::{BufWriter, Write};
13use std::path::PathBuf;
14
15use hashbrown::HashMap;
16use lasso::{Rodeo, Spur};
17use rayon::prelude::*;
18use rustc_hash::FxHashMap;
19
20use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
21use crate::compression::CompressionLevel;
22use crate::directories::{Directory, DirectoryWriter};
23use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
24use crate::structures::{PostingList, SSTableWriter, TermInfo};
25use crate::tokenizer::BoxedTokenizer;
26use crate::{DocId, Result};
27
28pub use super::vector_data::{FlatVectorData, IVFRaBitQIndexData, ScaNNIndexData};
30
31const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; #[derive(Clone, Copy, PartialEq, Eq, Hash)]
36struct TermKey {
37 field: u32,
38 term: Spur,
39}
40
41#[derive(Clone, Copy)]
43struct CompactPosting {
44 doc_id: DocId,
45 term_freq: u16,
46}
47
48struct PostingListBuilder {
50 postings: Vec<CompactPosting>,
52}
53
54impl PostingListBuilder {
55 fn new() -> Self {
56 Self {
57 postings: Vec::new(),
58 }
59 }
60
61 #[inline]
63 fn add(&mut self, doc_id: DocId, term_freq: u32) {
64 if let Some(last) = self.postings.last_mut()
66 && last.doc_id == doc_id
67 {
68 last.term_freq = last.term_freq.saturating_add(term_freq as u16);
69 return;
70 }
71 self.postings.push(CompactPosting {
72 doc_id,
73 term_freq: term_freq.min(u16::MAX as u32) as u16,
74 });
75 }
76
77 fn len(&self) -> usize {
78 self.postings.len()
79 }
80}
81
82struct PositionPostingListBuilder {
84 postings: Vec<(DocId, Vec<u32>)>,
86}
87
88impl PositionPostingListBuilder {
89 fn new() -> Self {
90 Self {
91 postings: Vec::new(),
92 }
93 }
94
95 #[inline]
97 fn add_position(&mut self, doc_id: DocId, position: u32) {
98 if let Some((last_doc, positions)) = self.postings.last_mut()
99 && *last_doc == doc_id
100 {
101 positions.push(position);
102 return;
103 }
104 self.postings.push((doc_id, vec![position]));
105 }
106}
107
108enum SerializedPosting {
110 Inline(TermInfo),
112 External { bytes: Vec<u8>, doc_count: u32 },
114}
115
116#[derive(Debug, Clone)]
118pub struct SegmentBuilderStats {
119 pub num_docs: u32,
121 pub unique_terms: usize,
123 pub postings_in_memory: usize,
125 pub interned_strings: usize,
127 pub doc_field_lengths_size: usize,
129 pub estimated_memory_bytes: usize,
131 pub memory_breakdown: MemoryBreakdown,
133}
134
135#[derive(Debug, Clone, Default)]
137pub struct MemoryBreakdown {
138 pub postings_bytes: usize,
140 pub index_overhead_bytes: usize,
142 pub interner_bytes: usize,
144 pub field_lengths_bytes: usize,
146 pub dense_vectors_bytes: usize,
148 pub dense_vector_count: usize,
150 pub sparse_vectors_bytes: usize,
152 pub position_index_bytes: usize,
154}
155
156#[derive(Clone)]
158pub struct SegmentBuilderConfig {
159 pub temp_dir: PathBuf,
161 pub compression_level: CompressionLevel,
163 pub num_compression_threads: usize,
165 pub interner_capacity: usize,
167 pub posting_map_capacity: usize,
169}
170
171impl Default for SegmentBuilderConfig {
172 fn default() -> Self {
173 Self {
174 temp_dir: std::env::temp_dir(),
175 compression_level: CompressionLevel(7),
176 num_compression_threads: num_cpus::get(),
177 interner_capacity: 1_000_000,
178 posting_map_capacity: 500_000,
179 }
180 }
181}
182
183pub struct SegmentBuilder {
190 schema: Schema,
191 config: SegmentBuilderConfig,
192 tokenizers: FxHashMap<Field, BoxedTokenizer>,
193
194 term_interner: Rodeo,
196
197 inverted_index: HashMap<TermKey, PostingListBuilder>,
199
200 store_file: BufWriter<File>,
202 store_path: PathBuf,
203
204 next_doc_id: DocId,
206
207 field_stats: FxHashMap<u32, FieldStats>,
209
210 doc_field_lengths: Vec<u32>,
214 num_indexed_fields: usize,
215 field_to_slot: FxHashMap<u32, usize>,
216
217 local_tf_buffer: FxHashMap<Spur, u32>,
220
221 token_buffer: String,
223
224 dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
227
228 sparse_vectors: FxHashMap<u32, SparseVectorBuilder>,
231
232 position_index: HashMap<TermKey, PositionPostingListBuilder>,
235
236 position_enabled_fields: FxHashMap<u32, Option<crate::dsl::PositionMode>>,
238
239 current_element_ordinal: FxHashMap<u32, u32>,
241
242 estimated_memory: usize,
244}
245
246struct DenseVectorBuilder {
248 dim: usize,
250 doc_ids: Vec<DocId>,
252 vectors: Vec<f32>,
254}
255
256impl DenseVectorBuilder {
257 fn new(dim: usize) -> Self {
258 Self {
259 dim,
260 doc_ids: Vec::new(),
261 vectors: Vec::new(),
262 }
263 }
264
265 fn add(&mut self, doc_id: DocId, vector: &[f32]) {
266 debug_assert_eq!(vector.len(), self.dim, "Vector dimension mismatch");
267 self.doc_ids.push(doc_id);
268 self.vectors.extend_from_slice(vector);
269 }
270
271 fn len(&self) -> usize {
272 self.doc_ids.len()
273 }
274
275 fn get_vectors(&self) -> Vec<Vec<f32>> {
277 self.doc_ids
278 .iter()
279 .enumerate()
280 .map(|(i, _)| {
281 let start = i * self.dim;
282 self.vectors[start..start + self.dim].to_vec()
283 })
284 .collect()
285 }
286
287 fn get_vectors_trimmed(&self, trim_dim: usize) -> Vec<Vec<f32>> {
289 debug_assert!(trim_dim <= self.dim, "trim_dim must be <= dim");
290 self.doc_ids
291 .iter()
292 .enumerate()
293 .map(|(i, _)| {
294 let start = i * self.dim;
295 self.vectors[start..start + trim_dim].to_vec()
296 })
297 .collect()
298 }
299}
300
301struct SparseVectorBuilder {
306 postings: FxHashMap<u32, Vec<(DocId, f32)>>,
308}
309
310impl SparseVectorBuilder {
311 fn new() -> Self {
312 Self {
313 postings: FxHashMap::default(),
314 }
315 }
316
317 #[inline]
319 fn add(&mut self, dim_id: u32, doc_id: DocId, weight: f32) {
320 self.postings
321 .entry(dim_id)
322 .or_default()
323 .push((doc_id, weight));
324 }
325
326 fn is_empty(&self) -> bool {
327 self.postings.is_empty()
328 }
329}
330
331impl SegmentBuilder {
332 pub fn new(schema: Schema, config: SegmentBuilderConfig) -> Result<Self> {
334 let segment_id = uuid::Uuid::new_v4();
335 let store_path = config
336 .temp_dir
337 .join(format!("hermes_store_{}.tmp", segment_id));
338
339 let store_file = BufWriter::with_capacity(
340 STORE_BUFFER_SIZE,
341 OpenOptions::new()
342 .create(true)
343 .write(true)
344 .truncate(true)
345 .open(&store_path)?,
346 );
347
348 let mut num_indexed_fields = 0;
351 let mut field_to_slot = FxHashMap::default();
352 let mut position_enabled_fields = FxHashMap::default();
353 for (field, entry) in schema.fields() {
354 if entry.indexed && matches!(entry.field_type, FieldType::Text) {
355 field_to_slot.insert(field.0, num_indexed_fields);
356 num_indexed_fields += 1;
357 if entry.positions.is_some() {
358 position_enabled_fields.insert(field.0, entry.positions);
359 }
360 }
361 }
362
363 Ok(Self {
364 schema,
365 tokenizers: FxHashMap::default(),
366 term_interner: Rodeo::new(),
367 inverted_index: HashMap::with_capacity(config.posting_map_capacity),
368 store_file,
369 store_path,
370 next_doc_id: 0,
371 field_stats: FxHashMap::default(),
372 doc_field_lengths: Vec::new(),
373 num_indexed_fields,
374 field_to_slot,
375 local_tf_buffer: FxHashMap::default(),
376 token_buffer: String::with_capacity(64),
377 config,
378 dense_vectors: FxHashMap::default(),
379 sparse_vectors: FxHashMap::default(),
380 position_index: HashMap::new(),
381 position_enabled_fields,
382 current_element_ordinal: FxHashMap::default(),
383 estimated_memory: 0,
384 })
385 }
386
387 pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
388 self.tokenizers.insert(field, tokenizer);
389 }
390
391 pub fn num_docs(&self) -> u32 {
392 self.next_doc_id
393 }
394
395 #[inline]
397 pub fn estimated_memory_bytes(&self) -> usize {
398 self.estimated_memory
399 }
400
401 pub fn stats(&self) -> SegmentBuilderStats {
403 use std::mem::size_of;
404
405 let postings_in_memory: usize =
406 self.inverted_index.values().map(|p| p.postings.len()).sum();
407
408 let compact_posting_size = size_of::<CompactPosting>();
411
412 let postings_bytes: usize = self
414 .inverted_index
415 .values()
416 .map(|p| {
417 p.postings.capacity() * compact_posting_size + size_of::<Vec<CompactPosting>>()
418 })
419 .sum();
420
421 let term_key_size = size_of::<TermKey>();
425 let posting_builder_size = size_of::<PostingListBuilder>();
426 let hashmap_entry_overhead = 24; let index_overhead_bytes = self.inverted_index.len()
428 * (term_key_size + posting_builder_size + hashmap_entry_overhead);
429
430 let avg_term_len = 8;
434 let interner_overhead_per_string = size_of::<lasso::Spur>() + 16;
435 let interner_bytes =
436 self.term_interner.len() * (avg_term_len + interner_overhead_per_string);
437
438 let field_lengths_bytes =
440 self.doc_field_lengths.capacity() * size_of::<u32>() + size_of::<Vec<u32>>();
441
442 let mut dense_vectors_bytes: usize = 0;
444 let mut dense_vector_count: usize = 0;
445 for b in self.dense_vectors.values() {
446 dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
448 + b.doc_ids.capacity() * size_of::<DocId>()
449 + size_of::<Vec<f32>>()
450 + size_of::<Vec<DocId>>();
451 dense_vector_count += b.doc_ids.len();
452 }
453
454 let local_tf_buffer_bytes =
456 self.local_tf_buffer.capacity() * (size_of::<lasso::Spur>() + size_of::<u32>() + 16);
457
458 let mut sparse_vectors_bytes: usize = 0;
461 for builder in self.sparse_vectors.values() {
462 for postings in builder.postings.values() {
464 sparse_vectors_bytes += postings.capacity() * 8 + size_of::<Vec<(DocId, f32)>>();
466 }
467 sparse_vectors_bytes += builder.postings.len() * 40;
469 }
470 sparse_vectors_bytes += self.sparse_vectors.len() * 40;
472
473 let mut position_index_bytes: usize = 0;
476 for pos_builder in self.position_index.values() {
477 for (_, positions) in &pos_builder.postings {
478 position_index_bytes +=
480 positions.capacity() * size_of::<u32>() + size_of::<Vec<u32>>();
481 }
482 position_index_bytes +=
484 pos_builder.postings.capacity() * (size_of::<DocId>() + size_of::<Vec<u32>>());
485 }
486 position_index_bytes += self.position_index.len() * (size_of::<TermKey>() + 40);
488
489 let estimated_memory_bytes = postings_bytes
490 + index_overhead_bytes
491 + interner_bytes
492 + field_lengths_bytes
493 + dense_vectors_bytes
494 + local_tf_buffer_bytes
495 + sparse_vectors_bytes
496 + position_index_bytes;
497
498 let memory_breakdown = MemoryBreakdown {
499 postings_bytes,
500 index_overhead_bytes,
501 interner_bytes,
502 field_lengths_bytes,
503 dense_vectors_bytes,
504 dense_vector_count,
505 sparse_vectors_bytes,
506 position_index_bytes,
507 };
508
509 SegmentBuilderStats {
510 num_docs: self.next_doc_id,
511 unique_terms: self.inverted_index.len(),
512 postings_in_memory,
513 interned_strings: self.term_interner.len(),
514 doc_field_lengths_size: self.doc_field_lengths.len(),
515 estimated_memory_bytes,
516 memory_breakdown,
517 }
518 }
519
520 pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
522 let doc_id = self.next_doc_id;
523 self.next_doc_id += 1;
524
525 let base_idx = self.doc_field_lengths.len();
527 self.doc_field_lengths
528 .resize(base_idx + self.num_indexed_fields, 0);
529
530 self.current_element_ordinal.clear();
532
533 for (field, value) in doc.field_values() {
534 let entry = self.schema.get_field_entry(*field);
535 if entry.is_none() || !entry.unwrap().indexed {
536 continue;
537 }
538
539 let entry = entry.unwrap();
540 match (&entry.field_type, value) {
541 (FieldType::Text, FieldValue::Text(text)) => {
542 let element_ordinal = *self.current_element_ordinal.get(&field.0).unwrap_or(&0);
544 let token_count =
545 self.index_text_field(*field, doc_id, text, element_ordinal)?;
546 *self.current_element_ordinal.entry(field.0).or_insert(0) += 1;
548
549 let stats = self.field_stats.entry(field.0).or_default();
551 stats.total_tokens += token_count as u64;
552 stats.doc_count += 1;
553
554 if let Some(&slot) = self.field_to_slot.get(&field.0) {
556 self.doc_field_lengths[base_idx + slot] = token_count;
557 }
558 }
559 (FieldType::U64, FieldValue::U64(v)) => {
560 self.index_numeric_field(*field, doc_id, *v)?;
561 }
562 (FieldType::I64, FieldValue::I64(v)) => {
563 self.index_numeric_field(*field, doc_id, *v as u64)?;
564 }
565 (FieldType::F64, FieldValue::F64(v)) => {
566 self.index_numeric_field(*field, doc_id, v.to_bits())?;
567 }
568 (FieldType::DenseVector, FieldValue::DenseVector(vec)) => {
569 self.index_dense_vector_field(*field, doc_id, vec)?;
570 }
571 (FieldType::SparseVector, FieldValue::SparseVector(entries)) => {
572 self.index_sparse_vector_field(*field, doc_id, entries)?;
573 }
574 _ => {}
575 }
576 }
577
578 self.write_document_to_store(&doc)?;
580
581 Ok(doc_id)
582 }
583
584 fn index_text_field(
595 &mut self,
596 field: Field,
597 doc_id: DocId,
598 text: &str,
599 element_ordinal: u32,
600 ) -> Result<u32> {
601 use crate::dsl::PositionMode;
602
603 let field_id = field.0;
604 let position_mode = self
605 .position_enabled_fields
606 .get(&field_id)
607 .copied()
608 .flatten();
609
610 self.local_tf_buffer.clear();
614
615 let mut local_positions: FxHashMap<Spur, Vec<u32>> = FxHashMap::default();
617
618 let mut token_position = 0u32;
619
620 for word in text.split_whitespace() {
622 self.token_buffer.clear();
624 for c in word.chars() {
625 if c.is_alphanumeric() {
626 for lc in c.to_lowercase() {
627 self.token_buffer.push(lc);
628 }
629 }
630 }
631
632 if self.token_buffer.is_empty() {
633 continue;
634 }
635
636 let term_spur = self.term_interner.get_or_intern(&self.token_buffer);
638 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
639
640 if let Some(mode) = position_mode {
642 let encoded_pos = match mode {
643 PositionMode::Ordinal => element_ordinal << 20,
645 PositionMode::TokenPosition => token_position,
647 PositionMode::Full => (element_ordinal << 20) | token_position,
649 };
650 local_positions
651 .entry(term_spur)
652 .or_default()
653 .push(encoded_pos);
654 }
655
656 token_position += 1;
657 }
658
659 for (&term_spur, &tf) in &self.local_tf_buffer {
662 let term_key = TermKey {
663 field: field_id,
664 term: term_spur,
665 };
666
667 let posting = self
668 .inverted_index
669 .entry(term_key)
670 .or_insert_with(PostingListBuilder::new);
671 posting.add(doc_id, tf);
672
673 if position_mode.is_some()
675 && let Some(positions) = local_positions.get(&term_spur)
676 {
677 let pos_posting = self
678 .position_index
679 .entry(term_key)
680 .or_insert_with(PositionPostingListBuilder::new);
681 for &pos in positions {
682 pos_posting.add_position(doc_id, pos);
683 }
684 }
685 }
686
687 Ok(token_position)
688 }
689
690 fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
691 let term_str = format!("__num_{}", value);
693 let term_spur = self.term_interner.get_or_intern(&term_str);
694
695 let term_key = TermKey {
696 field: field.0,
697 term: term_spur,
698 };
699
700 let posting = self
701 .inverted_index
702 .entry(term_key)
703 .or_insert_with(PostingListBuilder::new);
704 posting.add(doc_id, 1);
705
706 Ok(())
707 }
708
709 fn index_dense_vector_field(
711 &mut self,
712 field: Field,
713 doc_id: DocId,
714 vector: &[f32],
715 ) -> Result<()> {
716 let dim = vector.len();
717
718 let builder = self
719 .dense_vectors
720 .entry(field.0)
721 .or_insert_with(|| DenseVectorBuilder::new(dim));
722
723 if builder.dim != dim && builder.len() > 0 {
725 return Err(crate::Error::Schema(format!(
726 "Dense vector dimension mismatch: expected {}, got {}",
727 builder.dim, dim
728 )));
729 }
730
731 builder.add(doc_id, vector);
732 Ok(())
733 }
734
735 fn index_sparse_vector_field(
742 &mut self,
743 field: Field,
744 doc_id: DocId,
745 entries: &[(u32, f32)],
746 ) -> Result<()> {
747 let weight_threshold = self
749 .schema
750 .get_field_entry(field)
751 .and_then(|entry| entry.sparse_vector_config.as_ref())
752 .map(|config| config.weight_threshold)
753 .unwrap_or(0.0);
754
755 let builder = self
756 .sparse_vectors
757 .entry(field.0)
758 .or_insert_with(SparseVectorBuilder::new);
759
760 for &(dim_id, weight) in entries {
761 if weight.abs() < weight_threshold {
763 continue;
764 }
765
766 builder.add(dim_id, doc_id, weight);
767 }
768
769 Ok(())
770 }
771
772 fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
774 use byteorder::{LittleEndian, WriteBytesExt};
775
776 let doc_bytes = super::store::serialize_document(doc, &self.schema)?;
777
778 self.store_file
779 .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
780 self.store_file.write_all(&doc_bytes)?;
781
782 Ok(())
783 }
784
785 pub async fn build<D: Directory + DirectoryWriter>(
787 mut self,
788 dir: &D,
789 segment_id: SegmentId,
790 ) -> Result<SegmentMeta> {
791 self.store_file.flush()?;
793
794 let files = SegmentFiles::new(segment_id.0);
795
796 let (positions_data, position_offsets) = self.build_positions_file()?;
798
799 let store_path = self.store_path.clone();
801 let schema = self.schema.clone();
802 let num_compression_threads = self.config.num_compression_threads;
803 let compression_level = self.config.compression_level;
804
805 let (postings_result, store_result) = rayon::join(
807 || self.build_postings(&position_offsets),
808 || {
809 Self::build_store_parallel(
810 &store_path,
811 &schema,
812 num_compression_threads,
813 compression_level,
814 )
815 },
816 );
817
818 let (term_dict_data, postings_data) = postings_result?;
819 let store_data = store_result?;
820
821 dir.write(&files.term_dict, &term_dict_data).await?;
823 dir.write(&files.postings, &postings_data).await?;
824 dir.write(&files.store, &store_data).await?;
825
826 if !positions_data.is_empty() {
828 dir.write(&files.positions, &positions_data).await?;
829 }
830
831 if !self.dense_vectors.is_empty() {
833 let vectors_data = self.build_vectors_file()?;
834 if !vectors_data.is_empty() {
835 dir.write(&files.vectors, &vectors_data).await?;
836 }
837 }
838
839 if !self.sparse_vectors.is_empty() {
841 let sparse_data = self.build_sparse_file()?;
842 if !sparse_data.is_empty() {
843 dir.write(&files.sparse, &sparse_data).await?;
844 }
845 }
846
847 let meta = SegmentMeta {
848 id: segment_id.0,
849 num_docs: self.next_doc_id,
850 field_stats: self.field_stats.clone(),
851 };
852
853 dir.write(&files.meta, &meta.serialize()?).await?;
854
855 let _ = std::fs::remove_file(&self.store_path);
857
858 Ok(meta)
859 }
860
861 fn build_vectors_file(&self) -> Result<Vec<u8>> {
868 use byteorder::{LittleEndian, WriteBytesExt};
869
870 let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
872
873 for (&field_id, builder) in &self.dense_vectors {
874 if builder.len() == 0 {
875 continue;
876 }
877
878 let field = crate::dsl::Field(field_id);
879
880 let dense_config = self
882 .schema
883 .get_field_entry(field)
884 .and_then(|e| e.dense_vector_config.as_ref());
885
886 let index_dim = dense_config.map(|c| c.index_dim()).unwrap_or(builder.dim);
888 let vectors = if index_dim < builder.dim {
889 builder.get_vectors_trimmed(index_dim)
891 } else {
892 builder.get_vectors()
893 };
894
895 let flat_data = FlatVectorData {
899 dim: index_dim,
900 vectors: vectors.clone(),
901 doc_ids: builder.doc_ids.clone(),
902 };
903 let index_bytes = serde_json::to_vec(&flat_data)
904 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
905 let index_type = 3u8; field_indexes.push((field_id, index_type, index_bytes));
908 }
909
910 if field_indexes.is_empty() {
911 return Ok(Vec::new());
912 }
913
914 field_indexes.sort_by_key(|(id, _, _)| *id);
916
917 let header_size = 4 + field_indexes.len() * (4 + 1 + 8 + 8);
919
920 let mut output = Vec::new();
922
923 output.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
925
926 let mut current_offset = header_size as u64;
928 for (field_id, index_type, data) in &field_indexes {
929 output.write_u32::<LittleEndian>(*field_id)?;
930 output.write_u8(*index_type)?;
931 output.write_u64::<LittleEndian>(current_offset)?;
932 output.write_u64::<LittleEndian>(data.len() as u64)?;
933 current_offset += data.len() as u64;
934 }
935
936 for (_, _, data) in field_indexes {
938 output.extend_from_slice(&data);
939 }
940
941 Ok(output)
942 }
943
944 fn build_sparse_file(&self) -> Result<Vec<u8>> {
956 use crate::structures::{BlockSparsePostingList, WeightQuantization};
957 use byteorder::{LittleEndian, WriteBytesExt};
958
959 if self.sparse_vectors.is_empty() {
960 return Ok(Vec::new());
961 }
962
963 type SparseFieldData = (u32, WeightQuantization, u32, FxHashMap<u32, Vec<u8>>);
965 let mut field_data: Vec<SparseFieldData> = Vec::new();
966
967 for (&field_id, builder) in &self.sparse_vectors {
968 if builder.is_empty() {
969 continue;
970 }
971
972 let field = crate::dsl::Field(field_id);
973
974 let quantization = self
976 .schema
977 .get_field_entry(field)
978 .and_then(|e| e.sparse_vector_config.as_ref())
979 .map(|c| c.weight_quantization)
980 .unwrap_or(WeightQuantization::Float32);
981
982 let max_dim_id = builder.postings.keys().max().copied().unwrap_or(0);
984
985 let mut dim_bytes: FxHashMap<u32, Vec<u8>> = FxHashMap::default();
987
988 for (&dim_id, postings) in &builder.postings {
989 let mut sorted_postings = postings.clone();
991 sorted_postings.sort_by_key(|(doc_id, _)| *doc_id);
992
993 let block_list =
995 BlockSparsePostingList::from_postings(&sorted_postings, quantization)
996 .map_err(crate::Error::Io)?;
997
998 let mut bytes = Vec::new();
1000 block_list.serialize(&mut bytes).map_err(crate::Error::Io)?;
1001
1002 dim_bytes.insert(dim_id, bytes);
1003 }
1004
1005 field_data.push((field_id, quantization, max_dim_id + 1, dim_bytes));
1006 }
1007
1008 if field_data.is_empty() {
1009 return Ok(Vec::new());
1010 }
1011
1012 field_data.sort_by_key(|(id, _, _, _)| *id);
1014
1015 let mut header_size = 4u64;
1019 for (_, _, max_dim_id, _) in &field_data {
1020 header_size += 4 + 1 + 4; header_size += (*max_dim_id as u64) * 12; }
1023
1024 let mut output = Vec::new();
1026
1027 output.write_u32::<LittleEndian>(field_data.len() as u32)?;
1029
1030 let mut current_offset = header_size;
1032
1033 let mut all_data: Vec<u8> = Vec::new();
1035 let mut field_tables: Vec<Vec<(u64, u32)>> = Vec::new();
1036
1037 for (_, _, max_dim_id, dim_bytes) in &field_data {
1038 let mut table: Vec<(u64, u32)> = vec![(0, 0); *max_dim_id as usize];
1039
1040 for dim_id in 0..*max_dim_id {
1042 if let Some(bytes) = dim_bytes.get(&dim_id) {
1043 table[dim_id as usize] = (current_offset, bytes.len() as u32);
1044 current_offset += bytes.len() as u64;
1045 all_data.extend_from_slice(bytes);
1046 }
1047 }
1049
1050 field_tables.push(table);
1051 }
1052
1053 for (i, (field_id, quantization, max_dim_id, _)) in field_data.iter().enumerate() {
1055 output.write_u32::<LittleEndian>(*field_id)?;
1056 output.write_u8(*quantization as u8)?;
1057 output.write_u32::<LittleEndian>(*max_dim_id)?;
1058
1059 for &(offset, length) in &field_tables[i] {
1061 output.write_u64::<LittleEndian>(offset)?;
1062 output.write_u32::<LittleEndian>(length)?;
1063 }
1064 }
1065
1066 output.extend_from_slice(&all_data);
1068
1069 Ok(output)
1070 }
1071
1072 #[allow(clippy::type_complexity)]
1080 fn build_positions_file(&self) -> Result<(Vec<u8>, FxHashMap<Vec<u8>, (u64, u32)>)> {
1081 use crate::structures::PositionPostingList;
1082
1083 let mut position_offsets: FxHashMap<Vec<u8>, (u64, u32)> = FxHashMap::default();
1084
1085 if self.position_index.is_empty() {
1086 return Ok((Vec::new(), position_offsets));
1087 }
1088
1089 let mut entries: Vec<(Vec<u8>, &PositionPostingListBuilder)> = self
1091 .position_index
1092 .iter()
1093 .map(|(term_key, pos_list)| {
1094 let term_str = self.term_interner.resolve(&term_key.term);
1095 let mut key = Vec::with_capacity(4 + term_str.len());
1096 key.extend_from_slice(&term_key.field.to_le_bytes());
1097 key.extend_from_slice(term_str.as_bytes());
1098 (key, pos_list)
1099 })
1100 .collect();
1101
1102 entries.sort_by(|a, b| a.0.cmp(&b.0));
1103
1104 let mut output = Vec::new();
1106
1107 for (key, pos_builder) in entries {
1108 let mut pos_list = PositionPostingList::with_capacity(pos_builder.postings.len());
1110 for (doc_id, positions) in &pos_builder.postings {
1111 pos_list.push(*doc_id, positions.clone());
1112 }
1113
1114 let offset = output.len() as u64;
1116 pos_list.serialize(&mut output).map_err(crate::Error::Io)?;
1117 let len = (output.len() as u64 - offset) as u32;
1118
1119 position_offsets.insert(key, (offset, len));
1120 }
1121
1122 Ok((output, position_offsets))
1123 }
1124
1125 fn build_postings(
1130 &mut self,
1131 position_offsets: &FxHashMap<Vec<u8>, (u64, u32)>,
1132 ) -> Result<(Vec<u8>, Vec<u8>)> {
1133 let mut term_entries: Vec<(Vec<u8>, &PostingListBuilder)> = self
1136 .inverted_index
1137 .iter()
1138 .map(|(term_key, posting_list)| {
1139 let term_str = self.term_interner.resolve(&term_key.term);
1140 let mut key = Vec::with_capacity(4 + term_str.len());
1141 key.extend_from_slice(&term_key.field.to_le_bytes());
1142 key.extend_from_slice(term_str.as_bytes());
1143 (key, posting_list)
1144 })
1145 .collect();
1146
1147 term_entries.par_sort_unstable_by(|a, b| a.0.cmp(&b.0));
1149
1150 let serialized: Vec<(Vec<u8>, SerializedPosting)> = term_entries
1153 .into_par_iter()
1154 .map(|(key, posting_builder)| {
1155 let mut full_postings = PostingList::with_capacity(posting_builder.len());
1157 for p in &posting_builder.postings {
1158 full_postings.push(p.doc_id, p.term_freq as u32);
1159 }
1160
1161 let doc_ids: Vec<u32> = full_postings.iter().map(|p| p.doc_id).collect();
1163 let term_freqs: Vec<u32> = full_postings.iter().map(|p| p.term_freq).collect();
1164
1165 let has_positions = position_offsets.contains_key(&key);
1167 let result = if !has_positions
1168 && let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs)
1169 {
1170 SerializedPosting::Inline(inline)
1171 } else {
1172 let mut posting_bytes = Vec::new();
1174 let block_list =
1175 crate::structures::BlockPostingList::from_posting_list(&full_postings)
1176 .expect("BlockPostingList creation failed");
1177 block_list
1178 .serialize(&mut posting_bytes)
1179 .expect("BlockPostingList serialization failed");
1180 SerializedPosting::External {
1181 bytes: posting_bytes,
1182 doc_count: full_postings.doc_count(),
1183 }
1184 };
1185
1186 (key, result)
1187 })
1188 .collect();
1189
1190 let mut term_dict = Vec::new();
1192 let mut postings = Vec::new();
1193 let mut writer = SSTableWriter::<TermInfo>::new(&mut term_dict);
1194
1195 for (key, serialized_posting) in serialized {
1196 let term_info = match serialized_posting {
1197 SerializedPosting::Inline(info) => info,
1198 SerializedPosting::External { bytes, doc_count } => {
1199 let posting_offset = postings.len() as u64;
1200 let posting_len = bytes.len() as u32;
1201 postings.extend_from_slice(&bytes);
1202
1203 if let Some(&(pos_offset, pos_len)) = position_offsets.get(&key) {
1205 TermInfo::external_with_positions(
1206 posting_offset,
1207 posting_len,
1208 doc_count,
1209 pos_offset,
1210 pos_len,
1211 )
1212 } else {
1213 TermInfo::external(posting_offset, posting_len, doc_count)
1214 }
1215 }
1216 };
1217
1218 writer.insert(&key, &term_info)?;
1219 }
1220
1221 writer.finish()?;
1222 Ok((term_dict, postings))
1223 }
1224
1225 fn build_store_parallel(
1229 store_path: &PathBuf,
1230 schema: &Schema,
1231 num_compression_threads: usize,
1232 compression_level: CompressionLevel,
1233 ) -> Result<Vec<u8>> {
1234 use super::store::EagerParallelStoreWriter;
1235
1236 let file = File::open(store_path)?;
1237 let mmap = unsafe { memmap2::Mmap::map(&file)? };
1238
1239 let mut doc_ranges: Vec<(usize, usize)> = Vec::new();
1241 let mut offset = 0usize;
1242 while offset + 4 <= mmap.len() {
1243 let doc_len = u32::from_le_bytes([
1244 mmap[offset],
1245 mmap[offset + 1],
1246 mmap[offset + 2],
1247 mmap[offset + 3],
1248 ]) as usize;
1249 offset += 4;
1250
1251 if offset + doc_len > mmap.len() {
1252 break;
1253 }
1254
1255 doc_ranges.push((offset, doc_len));
1256 offset += doc_len;
1257 }
1258
1259 let docs: Vec<Document> = doc_ranges
1261 .into_par_iter()
1262 .filter_map(|(start, len)| {
1263 let doc_bytes = &mmap[start..start + len];
1264 super::store::deserialize_document(doc_bytes, schema).ok()
1265 })
1266 .collect();
1267
1268 let mut store_data = Vec::new();
1270 let mut store_writer = EagerParallelStoreWriter::with_compression_level(
1271 &mut store_data,
1272 num_compression_threads,
1273 compression_level,
1274 );
1275
1276 for doc in &docs {
1277 store_writer.store(doc, schema)?;
1278 }
1279
1280 store_writer.finish()?;
1281 Ok(store_data)
1282 }
1283}
1284
1285impl Drop for SegmentBuilder {
1286 fn drop(&mut self) {
1287 let _ = std::fs::remove_file(&self.store_path);
1289 }
1290}