1use std::fs::{File, OpenOptions};
12use std::io::{BufWriter, Write};
13use std::path::PathBuf;
14
15use hashbrown::HashMap;
16use lasso::{Rodeo, Spur};
17use rayon::prelude::*;
18use rustc_hash::FxHashMap;
19
20use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
21use crate::compression::CompressionLevel;
22use crate::directories::{Directory, DirectoryWriter};
23use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
24use crate::structures::{PostingList, SSTableWriter, TermInfo};
25use crate::tokenizer::BoxedTokenizer;
26use crate::{DocId, Result};
27
28pub use super::vector_data::{FlatVectorData, IVFRaBitQIndexData, ScaNNIndexData};
30
31const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; #[derive(Clone, Copy, PartialEq, Eq, Hash)]
36struct TermKey {
37 field: u32,
38 term: Spur,
39}
40
41#[derive(Clone, Copy)]
43struct CompactPosting {
44 doc_id: DocId,
45 term_freq: u16,
46}
47
48struct PostingListBuilder {
50 postings: Vec<CompactPosting>,
52}
53
54impl PostingListBuilder {
55 fn new() -> Self {
56 Self {
57 postings: Vec::new(),
58 }
59 }
60
61 #[inline]
63 fn add(&mut self, doc_id: DocId, term_freq: u32) {
64 if let Some(last) = self.postings.last_mut()
66 && last.doc_id == doc_id
67 {
68 last.term_freq = last.term_freq.saturating_add(term_freq as u16);
69 return;
70 }
71 self.postings.push(CompactPosting {
72 doc_id,
73 term_freq: term_freq.min(u16::MAX as u32) as u16,
74 });
75 }
76
77 fn len(&self) -> usize {
78 self.postings.len()
79 }
80}
81
82struct PositionPostingListBuilder {
84 postings: Vec<(DocId, Vec<u32>)>,
86}
87
88impl PositionPostingListBuilder {
89 fn new() -> Self {
90 Self {
91 postings: Vec::new(),
92 }
93 }
94
95 #[inline]
97 fn add_position(&mut self, doc_id: DocId, position: u32) {
98 if let Some((last_doc, positions)) = self.postings.last_mut()
99 && *last_doc == doc_id
100 {
101 positions.push(position);
102 return;
103 }
104 self.postings.push((doc_id, vec![position]));
105 }
106}
107
108enum SerializedPosting {
110 Inline(TermInfo),
112 External { bytes: Vec<u8>, doc_count: u32 },
114}
115
116#[derive(Debug, Clone)]
118pub struct SegmentBuilderStats {
119 pub num_docs: u32,
121 pub unique_terms: usize,
123 pub postings_in_memory: usize,
125 pub interned_strings: usize,
127 pub doc_field_lengths_size: usize,
129 pub estimated_memory_bytes: usize,
131 pub memory_breakdown: MemoryBreakdown,
133}
134
135#[derive(Debug, Clone, Default)]
137pub struct MemoryBreakdown {
138 pub postings_bytes: usize,
140 pub index_overhead_bytes: usize,
142 pub interner_bytes: usize,
144 pub field_lengths_bytes: usize,
146 pub dense_vectors_bytes: usize,
148 pub dense_vector_count: usize,
150}
151
152#[derive(Clone)]
154pub struct SegmentBuilderConfig {
155 pub temp_dir: PathBuf,
157 pub compression_level: CompressionLevel,
159 pub num_compression_threads: usize,
161 pub interner_capacity: usize,
163 pub posting_map_capacity: usize,
165}
166
167impl Default for SegmentBuilderConfig {
168 fn default() -> Self {
169 Self {
170 temp_dir: std::env::temp_dir(),
171 compression_level: CompressionLevel(7),
172 num_compression_threads: num_cpus::get(),
173 interner_capacity: 1_000_000,
174 posting_map_capacity: 500_000,
175 }
176 }
177}
178
179pub struct SegmentBuilder {
186 schema: Schema,
187 config: SegmentBuilderConfig,
188 tokenizers: FxHashMap<Field, BoxedTokenizer>,
189
190 term_interner: Rodeo,
192
193 inverted_index: HashMap<TermKey, PostingListBuilder>,
195
196 store_file: BufWriter<File>,
198 store_path: PathBuf,
199
200 next_doc_id: DocId,
202
203 field_stats: FxHashMap<u32, FieldStats>,
205
206 doc_field_lengths: Vec<u32>,
210 num_indexed_fields: usize,
211 field_to_slot: FxHashMap<u32, usize>,
212
213 local_tf_buffer: FxHashMap<Spur, u32>,
216
217 token_buffer: String,
219
220 dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
223
224 sparse_vectors: FxHashMap<u32, SparseVectorBuilder>,
227
228 position_index: HashMap<TermKey, PositionPostingListBuilder>,
231
232 position_enabled_fields: FxHashMap<u32, Option<crate::dsl::PositionMode>>,
234
235 current_element_ordinal: FxHashMap<u32, u32>,
237}
238
239struct DenseVectorBuilder {
241 dim: usize,
243 doc_ids: Vec<DocId>,
245 vectors: Vec<f32>,
247}
248
249impl DenseVectorBuilder {
250 fn new(dim: usize) -> Self {
251 Self {
252 dim,
253 doc_ids: Vec::new(),
254 vectors: Vec::new(),
255 }
256 }
257
258 fn add(&mut self, doc_id: DocId, vector: &[f32]) {
259 debug_assert_eq!(vector.len(), self.dim, "Vector dimension mismatch");
260 self.doc_ids.push(doc_id);
261 self.vectors.extend_from_slice(vector);
262 }
263
264 fn len(&self) -> usize {
265 self.doc_ids.len()
266 }
267
268 fn get_vectors(&self) -> Vec<Vec<f32>> {
270 self.doc_ids
271 .iter()
272 .enumerate()
273 .map(|(i, _)| {
274 let start = i * self.dim;
275 self.vectors[start..start + self.dim].to_vec()
276 })
277 .collect()
278 }
279
280 fn get_vectors_trimmed(&self, trim_dim: usize) -> Vec<Vec<f32>> {
282 debug_assert!(trim_dim <= self.dim, "trim_dim must be <= dim");
283 self.doc_ids
284 .iter()
285 .enumerate()
286 .map(|(i, _)| {
287 let start = i * self.dim;
288 self.vectors[start..start + trim_dim].to_vec()
289 })
290 .collect()
291 }
292}
293
294struct SparseVectorBuilder {
299 postings: FxHashMap<u32, Vec<(DocId, f32)>>,
301}
302
303impl SparseVectorBuilder {
304 fn new() -> Self {
305 Self {
306 postings: FxHashMap::default(),
307 }
308 }
309
310 #[inline]
312 fn add(&mut self, dim_id: u32, doc_id: DocId, weight: f32) {
313 self.postings
314 .entry(dim_id)
315 .or_default()
316 .push((doc_id, weight));
317 }
318
319 fn is_empty(&self) -> bool {
320 self.postings.is_empty()
321 }
322}
323
324impl SegmentBuilder {
325 pub fn new(schema: Schema, config: SegmentBuilderConfig) -> Result<Self> {
327 let segment_id = uuid::Uuid::new_v4();
328 let store_path = config
329 .temp_dir
330 .join(format!("hermes_store_{}.tmp", segment_id));
331
332 let store_file = BufWriter::with_capacity(
333 STORE_BUFFER_SIZE,
334 OpenOptions::new()
335 .create(true)
336 .write(true)
337 .truncate(true)
338 .open(&store_path)?,
339 );
340
341 let mut num_indexed_fields = 0;
344 let mut field_to_slot = FxHashMap::default();
345 let mut position_enabled_fields = FxHashMap::default();
346 for (field, entry) in schema.fields() {
347 if entry.indexed && matches!(entry.field_type, FieldType::Text) {
348 field_to_slot.insert(field.0, num_indexed_fields);
349 num_indexed_fields += 1;
350 if entry.positions.is_some() {
351 position_enabled_fields.insert(field.0, entry.positions);
352 }
353 }
354 }
355
356 Ok(Self {
357 schema,
358 tokenizers: FxHashMap::default(),
359 term_interner: Rodeo::new(),
360 inverted_index: HashMap::with_capacity(config.posting_map_capacity),
361 store_file,
362 store_path,
363 next_doc_id: 0,
364 field_stats: FxHashMap::default(),
365 doc_field_lengths: Vec::new(),
366 num_indexed_fields,
367 field_to_slot,
368 local_tf_buffer: FxHashMap::default(),
369 token_buffer: String::with_capacity(64),
370 config,
371 dense_vectors: FxHashMap::default(),
372 sparse_vectors: FxHashMap::default(),
373 position_index: HashMap::new(),
374 position_enabled_fields,
375 current_element_ordinal: FxHashMap::default(),
376 })
377 }
378
379 pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
380 self.tokenizers.insert(field, tokenizer);
381 }
382
383 pub fn num_docs(&self) -> u32 {
384 self.next_doc_id
385 }
386
387 pub fn stats(&self) -> SegmentBuilderStats {
389 use std::mem::size_of;
390
391 let postings_in_memory: usize =
392 self.inverted_index.values().map(|p| p.postings.len()).sum();
393
394 let compact_posting_size = size_of::<CompactPosting>();
397
398 let postings_bytes: usize = self
400 .inverted_index
401 .values()
402 .map(|p| {
403 p.postings.capacity() * compact_posting_size + size_of::<Vec<CompactPosting>>()
404 })
405 .sum();
406
407 let term_key_size = size_of::<TermKey>();
411 let posting_builder_size = size_of::<PostingListBuilder>();
412 let hashmap_entry_overhead = 24; let index_overhead_bytes = self.inverted_index.len()
414 * (term_key_size + posting_builder_size + hashmap_entry_overhead);
415
416 let avg_term_len = 8;
420 let interner_overhead_per_string = size_of::<lasso::Spur>() + 16;
421 let interner_bytes =
422 self.term_interner.len() * (avg_term_len + interner_overhead_per_string);
423
424 let field_lengths_bytes =
426 self.doc_field_lengths.capacity() * size_of::<u32>() + size_of::<Vec<u32>>();
427
428 let mut dense_vectors_bytes: usize = 0;
430 let mut dense_vector_count: usize = 0;
431 for b in self.dense_vectors.values() {
432 dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
434 + b.doc_ids.capacity() * size_of::<DocId>()
435 + size_of::<Vec<f32>>()
436 + size_of::<Vec<DocId>>();
437 dense_vector_count += b.doc_ids.len();
438 }
439
440 let local_tf_buffer_bytes =
442 self.local_tf_buffer.capacity() * (size_of::<lasso::Spur>() + size_of::<u32>() + 16);
443
444 let estimated_memory_bytes = postings_bytes
445 + index_overhead_bytes
446 + interner_bytes
447 + field_lengths_bytes
448 + dense_vectors_bytes
449 + local_tf_buffer_bytes;
450
451 let memory_breakdown = MemoryBreakdown {
452 postings_bytes,
453 index_overhead_bytes,
454 interner_bytes,
455 field_lengths_bytes,
456 dense_vectors_bytes,
457 dense_vector_count,
458 };
459
460 SegmentBuilderStats {
461 num_docs: self.next_doc_id,
462 unique_terms: self.inverted_index.len(),
463 postings_in_memory,
464 interned_strings: self.term_interner.len(),
465 doc_field_lengths_size: self.doc_field_lengths.len(),
466 estimated_memory_bytes,
467 memory_breakdown,
468 }
469 }
470
471 pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
473 let doc_id = self.next_doc_id;
474 self.next_doc_id += 1;
475
476 let base_idx = self.doc_field_lengths.len();
478 self.doc_field_lengths
479 .resize(base_idx + self.num_indexed_fields, 0);
480
481 self.current_element_ordinal.clear();
483
484 for (field, value) in doc.field_values() {
485 let entry = self.schema.get_field_entry(*field);
486 if entry.is_none() || !entry.unwrap().indexed {
487 continue;
488 }
489
490 let entry = entry.unwrap();
491 match (&entry.field_type, value) {
492 (FieldType::Text, FieldValue::Text(text)) => {
493 let element_ordinal = *self.current_element_ordinal.get(&field.0).unwrap_or(&0);
495 let token_count =
496 self.index_text_field(*field, doc_id, text, element_ordinal)?;
497 *self.current_element_ordinal.entry(field.0).or_insert(0) += 1;
499
500 let stats = self.field_stats.entry(field.0).or_default();
502 stats.total_tokens += token_count as u64;
503 stats.doc_count += 1;
504
505 if let Some(&slot) = self.field_to_slot.get(&field.0) {
507 self.doc_field_lengths[base_idx + slot] = token_count;
508 }
509 }
510 (FieldType::U64, FieldValue::U64(v)) => {
511 self.index_numeric_field(*field, doc_id, *v)?;
512 }
513 (FieldType::I64, FieldValue::I64(v)) => {
514 self.index_numeric_field(*field, doc_id, *v as u64)?;
515 }
516 (FieldType::F64, FieldValue::F64(v)) => {
517 self.index_numeric_field(*field, doc_id, v.to_bits())?;
518 }
519 (FieldType::DenseVector, FieldValue::DenseVector(vec)) => {
520 self.index_dense_vector_field(*field, doc_id, vec)?;
521 }
522 (FieldType::SparseVector, FieldValue::SparseVector(entries)) => {
523 self.index_sparse_vector_field(*field, doc_id, entries)?;
524 }
525 _ => {}
526 }
527 }
528
529 self.write_document_to_store(&doc)?;
531
532 Ok(doc_id)
533 }
534
535 fn index_text_field(
546 &mut self,
547 field: Field,
548 doc_id: DocId,
549 text: &str,
550 element_ordinal: u32,
551 ) -> Result<u32> {
552 use crate::dsl::PositionMode;
553
554 let field_id = field.0;
555 let position_mode = self
556 .position_enabled_fields
557 .get(&field_id)
558 .copied()
559 .flatten();
560
561 self.local_tf_buffer.clear();
565
566 let mut local_positions: FxHashMap<Spur, Vec<u32>> = FxHashMap::default();
568
569 let mut token_position = 0u32;
570
571 for word in text.split_whitespace() {
573 self.token_buffer.clear();
575 for c in word.chars() {
576 if c.is_alphanumeric() {
577 for lc in c.to_lowercase() {
578 self.token_buffer.push(lc);
579 }
580 }
581 }
582
583 if self.token_buffer.is_empty() {
584 continue;
585 }
586
587 let term_spur = self.term_interner.get_or_intern(&self.token_buffer);
589 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
590
591 if let Some(mode) = position_mode {
593 let encoded_pos = match mode {
594 PositionMode::Ordinal => element_ordinal << 20,
596 PositionMode::TokenPosition => token_position,
598 PositionMode::Full => (element_ordinal << 20) | token_position,
600 };
601 local_positions
602 .entry(term_spur)
603 .or_default()
604 .push(encoded_pos);
605 }
606
607 token_position += 1;
608 }
609
610 for (&term_spur, &tf) in &self.local_tf_buffer {
613 let term_key = TermKey {
614 field: field_id,
615 term: term_spur,
616 };
617
618 let posting = self
619 .inverted_index
620 .entry(term_key)
621 .or_insert_with(PostingListBuilder::new);
622 posting.add(doc_id, tf);
623
624 if position_mode.is_some()
626 && let Some(positions) = local_positions.get(&term_spur)
627 {
628 let pos_posting = self
629 .position_index
630 .entry(term_key)
631 .or_insert_with(PositionPostingListBuilder::new);
632 for &pos in positions {
633 pos_posting.add_position(doc_id, pos);
634 }
635 }
636 }
637
638 Ok(token_position)
639 }
640
641 fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
642 let term_str = format!("__num_{}", value);
644 let term_spur = self.term_interner.get_or_intern(&term_str);
645
646 let term_key = TermKey {
647 field: field.0,
648 term: term_spur,
649 };
650
651 let posting = self
652 .inverted_index
653 .entry(term_key)
654 .or_insert_with(PostingListBuilder::new);
655 posting.add(doc_id, 1);
656
657 Ok(())
658 }
659
660 fn index_dense_vector_field(
662 &mut self,
663 field: Field,
664 doc_id: DocId,
665 vector: &[f32],
666 ) -> Result<()> {
667 let dim = vector.len();
668
669 let builder = self
670 .dense_vectors
671 .entry(field.0)
672 .or_insert_with(|| DenseVectorBuilder::new(dim));
673
674 if builder.dim != dim && builder.len() > 0 {
676 return Err(crate::Error::Schema(format!(
677 "Dense vector dimension mismatch: expected {}, got {}",
678 builder.dim, dim
679 )));
680 }
681
682 builder.add(doc_id, vector);
683 Ok(())
684 }
685
686 fn index_sparse_vector_field(
693 &mut self,
694 field: Field,
695 doc_id: DocId,
696 entries: &[(u32, f32)],
697 ) -> Result<()> {
698 let weight_threshold = self
700 .schema
701 .get_field_entry(field)
702 .and_then(|entry| entry.sparse_vector_config.as_ref())
703 .map(|config| config.weight_threshold)
704 .unwrap_or(0.0);
705
706 let builder = self
707 .sparse_vectors
708 .entry(field.0)
709 .or_insert_with(SparseVectorBuilder::new);
710
711 for &(dim_id, weight) in entries {
712 if weight.abs() < weight_threshold {
714 continue;
715 }
716
717 builder.add(dim_id, doc_id, weight);
718 }
719
720 Ok(())
721 }
722
723 fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
725 use byteorder::{LittleEndian, WriteBytesExt};
726
727 let doc_bytes = super::store::serialize_document(doc, &self.schema)?;
728
729 self.store_file
730 .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
731 self.store_file.write_all(&doc_bytes)?;
732
733 Ok(())
734 }
735
736 pub async fn build<D: Directory + DirectoryWriter>(
738 mut self,
739 dir: &D,
740 segment_id: SegmentId,
741 ) -> Result<SegmentMeta> {
742 self.store_file.flush()?;
744
745 let files = SegmentFiles::new(segment_id.0);
746
747 let (positions_data, position_offsets) = self.build_positions_file()?;
749
750 let store_path = self.store_path.clone();
752 let schema = self.schema.clone();
753 let num_compression_threads = self.config.num_compression_threads;
754 let compression_level = self.config.compression_level;
755
756 let (postings_result, store_result) = rayon::join(
758 || self.build_postings(&position_offsets),
759 || {
760 Self::build_store_parallel(
761 &store_path,
762 &schema,
763 num_compression_threads,
764 compression_level,
765 )
766 },
767 );
768
769 let (term_dict_data, postings_data) = postings_result?;
770 let store_data = store_result?;
771
772 dir.write(&files.term_dict, &term_dict_data).await?;
774 dir.write(&files.postings, &postings_data).await?;
775 dir.write(&files.store, &store_data).await?;
776
777 if !positions_data.is_empty() {
779 dir.write(&files.positions, &positions_data).await?;
780 }
781
782 if !self.dense_vectors.is_empty() {
784 let vectors_data = self.build_vectors_file()?;
785 if !vectors_data.is_empty() {
786 dir.write(&files.vectors, &vectors_data).await?;
787 }
788 }
789
790 if !self.sparse_vectors.is_empty() {
792 let sparse_data = self.build_sparse_file()?;
793 if !sparse_data.is_empty() {
794 dir.write(&files.sparse, &sparse_data).await?;
795 }
796 }
797
798 let meta = SegmentMeta {
799 id: segment_id.0,
800 num_docs: self.next_doc_id,
801 field_stats: self.field_stats.clone(),
802 };
803
804 dir.write(&files.meta, &meta.serialize()?).await?;
805
806 let _ = std::fs::remove_file(&self.store_path);
808
809 Ok(meta)
810 }
811
812 fn build_vectors_file(&self) -> Result<Vec<u8>> {
819 use byteorder::{LittleEndian, WriteBytesExt};
820
821 let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
823
824 for (&field_id, builder) in &self.dense_vectors {
825 if builder.len() == 0 {
826 continue;
827 }
828
829 let field = crate::dsl::Field(field_id);
830
831 let dense_config = self
833 .schema
834 .get_field_entry(field)
835 .and_then(|e| e.dense_vector_config.as_ref());
836
837 let index_dim = dense_config.map(|c| c.index_dim()).unwrap_or(builder.dim);
839 let vectors = if index_dim < builder.dim {
840 builder.get_vectors_trimmed(index_dim)
842 } else {
843 builder.get_vectors()
844 };
845
846 let flat_data = FlatVectorData {
850 dim: index_dim,
851 vectors: vectors.clone(),
852 doc_ids: builder.doc_ids.clone(),
853 };
854 let index_bytes = serde_json::to_vec(&flat_data)
855 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
856 let index_type = 3u8; field_indexes.push((field_id, index_type, index_bytes));
859 }
860
861 if field_indexes.is_empty() {
862 return Ok(Vec::new());
863 }
864
865 field_indexes.sort_by_key(|(id, _, _)| *id);
867
868 let header_size = 4 + field_indexes.len() * (4 + 1 + 8 + 8);
870
871 let mut output = Vec::new();
873
874 output.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
876
877 let mut current_offset = header_size as u64;
879 for (field_id, index_type, data) in &field_indexes {
880 output.write_u32::<LittleEndian>(*field_id)?;
881 output.write_u8(*index_type)?;
882 output.write_u64::<LittleEndian>(current_offset)?;
883 output.write_u64::<LittleEndian>(data.len() as u64)?;
884 current_offset += data.len() as u64;
885 }
886
887 for (_, _, data) in field_indexes {
889 output.extend_from_slice(&data);
890 }
891
892 Ok(output)
893 }
894
895 fn build_sparse_file(&self) -> Result<Vec<u8>> {
907 use crate::structures::{BlockSparsePostingList, WeightQuantization};
908 use byteorder::{LittleEndian, WriteBytesExt};
909
910 if self.sparse_vectors.is_empty() {
911 return Ok(Vec::new());
912 }
913
914 type SparseFieldData = (u32, WeightQuantization, u32, FxHashMap<u32, Vec<u8>>);
916 let mut field_data: Vec<SparseFieldData> = Vec::new();
917
918 for (&field_id, builder) in &self.sparse_vectors {
919 if builder.is_empty() {
920 continue;
921 }
922
923 let field = crate::dsl::Field(field_id);
924
925 let quantization = self
927 .schema
928 .get_field_entry(field)
929 .and_then(|e| e.sparse_vector_config.as_ref())
930 .map(|c| c.weight_quantization)
931 .unwrap_or(WeightQuantization::Float32);
932
933 let max_dim_id = builder.postings.keys().max().copied().unwrap_or(0);
935
936 let mut dim_bytes: FxHashMap<u32, Vec<u8>> = FxHashMap::default();
938
939 for (&dim_id, postings) in &builder.postings {
940 let mut sorted_postings = postings.clone();
942 sorted_postings.sort_by_key(|(doc_id, _)| *doc_id);
943
944 let block_list =
946 BlockSparsePostingList::from_postings(&sorted_postings, quantization)
947 .map_err(crate::Error::Io)?;
948
949 let mut bytes = Vec::new();
951 block_list.serialize(&mut bytes).map_err(crate::Error::Io)?;
952
953 dim_bytes.insert(dim_id, bytes);
954 }
955
956 field_data.push((field_id, quantization, max_dim_id + 1, dim_bytes));
957 }
958
959 if field_data.is_empty() {
960 return Ok(Vec::new());
961 }
962
963 field_data.sort_by_key(|(id, _, _, _)| *id);
965
966 let mut header_size = 4u64;
970 for (_, _, max_dim_id, _) in &field_data {
971 header_size += 4 + 1 + 4; header_size += (*max_dim_id as u64) * 12; }
974
975 let mut output = Vec::new();
977
978 output.write_u32::<LittleEndian>(field_data.len() as u32)?;
980
981 let mut current_offset = header_size;
983
984 let mut all_data: Vec<u8> = Vec::new();
986 let mut field_tables: Vec<Vec<(u64, u32)>> = Vec::new();
987
988 for (_, _, max_dim_id, dim_bytes) in &field_data {
989 let mut table: Vec<(u64, u32)> = vec![(0, 0); *max_dim_id as usize];
990
991 for dim_id in 0..*max_dim_id {
993 if let Some(bytes) = dim_bytes.get(&dim_id) {
994 table[dim_id as usize] = (current_offset, bytes.len() as u32);
995 current_offset += bytes.len() as u64;
996 all_data.extend_from_slice(bytes);
997 }
998 }
1000
1001 field_tables.push(table);
1002 }
1003
1004 for (i, (field_id, quantization, max_dim_id, _)) in field_data.iter().enumerate() {
1006 output.write_u32::<LittleEndian>(*field_id)?;
1007 output.write_u8(*quantization as u8)?;
1008 output.write_u32::<LittleEndian>(*max_dim_id)?;
1009
1010 for &(offset, length) in &field_tables[i] {
1012 output.write_u64::<LittleEndian>(offset)?;
1013 output.write_u32::<LittleEndian>(length)?;
1014 }
1015 }
1016
1017 output.extend_from_slice(&all_data);
1019
1020 Ok(output)
1021 }
1022
1023 #[allow(clippy::type_complexity)]
1031 fn build_positions_file(&self) -> Result<(Vec<u8>, FxHashMap<Vec<u8>, (u64, u32)>)> {
1032 use crate::structures::PositionPostingList;
1033
1034 let mut position_offsets: FxHashMap<Vec<u8>, (u64, u32)> = FxHashMap::default();
1035
1036 if self.position_index.is_empty() {
1037 return Ok((Vec::new(), position_offsets));
1038 }
1039
1040 let mut entries: Vec<(Vec<u8>, &PositionPostingListBuilder)> = self
1042 .position_index
1043 .iter()
1044 .map(|(term_key, pos_list)| {
1045 let term_str = self.term_interner.resolve(&term_key.term);
1046 let mut key = Vec::with_capacity(4 + term_str.len());
1047 key.extend_from_slice(&term_key.field.to_le_bytes());
1048 key.extend_from_slice(term_str.as_bytes());
1049 (key, pos_list)
1050 })
1051 .collect();
1052
1053 entries.sort_by(|a, b| a.0.cmp(&b.0));
1054
1055 let mut output = Vec::new();
1057
1058 for (key, pos_builder) in entries {
1059 let mut pos_list = PositionPostingList::with_capacity(pos_builder.postings.len());
1061 for (doc_id, positions) in &pos_builder.postings {
1062 pos_list.push(*doc_id, positions.clone());
1063 }
1064
1065 let offset = output.len() as u64;
1067 pos_list.serialize(&mut output).map_err(crate::Error::Io)?;
1068 let len = (output.len() as u64 - offset) as u32;
1069
1070 position_offsets.insert(key, (offset, len));
1071 }
1072
1073 Ok((output, position_offsets))
1074 }
1075
1076 fn build_postings(
1081 &mut self,
1082 position_offsets: &FxHashMap<Vec<u8>, (u64, u32)>,
1083 ) -> Result<(Vec<u8>, Vec<u8>)> {
1084 let mut term_entries: Vec<(Vec<u8>, &PostingListBuilder)> = self
1087 .inverted_index
1088 .iter()
1089 .map(|(term_key, posting_list)| {
1090 let term_str = self.term_interner.resolve(&term_key.term);
1091 let mut key = Vec::with_capacity(4 + term_str.len());
1092 key.extend_from_slice(&term_key.field.to_le_bytes());
1093 key.extend_from_slice(term_str.as_bytes());
1094 (key, posting_list)
1095 })
1096 .collect();
1097
1098 term_entries.par_sort_unstable_by(|a, b| a.0.cmp(&b.0));
1100
1101 let serialized: Vec<(Vec<u8>, SerializedPosting)> = term_entries
1104 .into_par_iter()
1105 .map(|(key, posting_builder)| {
1106 let mut full_postings = PostingList::with_capacity(posting_builder.len());
1108 for p in &posting_builder.postings {
1109 full_postings.push(p.doc_id, p.term_freq as u32);
1110 }
1111
1112 let doc_ids: Vec<u32> = full_postings.iter().map(|p| p.doc_id).collect();
1114 let term_freqs: Vec<u32> = full_postings.iter().map(|p| p.term_freq).collect();
1115
1116 let has_positions = position_offsets.contains_key(&key);
1118 let result = if !has_positions
1119 && let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs)
1120 {
1121 SerializedPosting::Inline(inline)
1122 } else {
1123 let mut posting_bytes = Vec::new();
1125 let block_list =
1126 crate::structures::BlockPostingList::from_posting_list(&full_postings)
1127 .expect("BlockPostingList creation failed");
1128 block_list
1129 .serialize(&mut posting_bytes)
1130 .expect("BlockPostingList serialization failed");
1131 SerializedPosting::External {
1132 bytes: posting_bytes,
1133 doc_count: full_postings.doc_count(),
1134 }
1135 };
1136
1137 (key, result)
1138 })
1139 .collect();
1140
1141 let mut term_dict = Vec::new();
1143 let mut postings = Vec::new();
1144 let mut writer = SSTableWriter::<TermInfo>::new(&mut term_dict);
1145
1146 for (key, serialized_posting) in serialized {
1147 let term_info = match serialized_posting {
1148 SerializedPosting::Inline(info) => info,
1149 SerializedPosting::External { bytes, doc_count } => {
1150 let posting_offset = postings.len() as u64;
1151 let posting_len = bytes.len() as u32;
1152 postings.extend_from_slice(&bytes);
1153
1154 if let Some(&(pos_offset, pos_len)) = position_offsets.get(&key) {
1156 TermInfo::external_with_positions(
1157 posting_offset,
1158 posting_len,
1159 doc_count,
1160 pos_offset,
1161 pos_len,
1162 )
1163 } else {
1164 TermInfo::external(posting_offset, posting_len, doc_count)
1165 }
1166 }
1167 };
1168
1169 writer.insert(&key, &term_info)?;
1170 }
1171
1172 writer.finish()?;
1173 Ok((term_dict, postings))
1174 }
1175
1176 fn build_store_parallel(
1180 store_path: &PathBuf,
1181 schema: &Schema,
1182 num_compression_threads: usize,
1183 compression_level: CompressionLevel,
1184 ) -> Result<Vec<u8>> {
1185 use super::store::EagerParallelStoreWriter;
1186
1187 let file = File::open(store_path)?;
1188 let mmap = unsafe { memmap2::Mmap::map(&file)? };
1189
1190 let mut doc_ranges: Vec<(usize, usize)> = Vec::new();
1192 let mut offset = 0usize;
1193 while offset + 4 <= mmap.len() {
1194 let doc_len = u32::from_le_bytes([
1195 mmap[offset],
1196 mmap[offset + 1],
1197 mmap[offset + 2],
1198 mmap[offset + 3],
1199 ]) as usize;
1200 offset += 4;
1201
1202 if offset + doc_len > mmap.len() {
1203 break;
1204 }
1205
1206 doc_ranges.push((offset, doc_len));
1207 offset += doc_len;
1208 }
1209
1210 let docs: Vec<Document> = doc_ranges
1212 .into_par_iter()
1213 .filter_map(|(start, len)| {
1214 let doc_bytes = &mmap[start..start + len];
1215 super::store::deserialize_document(doc_bytes, schema).ok()
1216 })
1217 .collect();
1218
1219 let mut store_data = Vec::new();
1221 let mut store_writer = EagerParallelStoreWriter::with_compression_level(
1222 &mut store_data,
1223 num_compression_threads,
1224 compression_level,
1225 );
1226
1227 for doc in &docs {
1228 store_writer.store(doc, schema)?;
1229 }
1230
1231 store_writer.finish()?;
1232 Ok(store_data)
1233 }
1234}
1235
1236impl Drop for SegmentBuilder {
1237 fn drop(&mut self) {
1238 let _ = std::fs::remove_file(&self.store_path);
1240 }
1241}