1use std::fs::{File, OpenOptions};
12use std::io::{BufWriter, Write};
13use std::path::PathBuf;
14
15use hashbrown::HashMap;
16use lasso::{Rodeo, Spur};
17use rayon::prelude::*;
18use rustc_hash::FxHashMap;
19
20use serde::{Deserialize, Serialize};
21
22use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
23use crate::compression::CompressionLevel;
24use crate::directories::{Directory, DirectoryWriter};
25use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
26use crate::structures::{PostingList, SSTableWriter, TermInfo};
27use crate::tokenizer::BoxedTokenizer;
28use crate::{DocId, Result};
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct FlatVectorData {
33 pub dim: usize,
34 pub vectors: Vec<Vec<f32>>,
35 pub doc_ids: Vec<u32>,
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct IVFRaBitQIndexData {
41 pub index: crate::structures::IVFRaBitQIndex,
42 pub centroids: crate::structures::CoarseCentroids,
43 pub codebook: crate::structures::RaBitQCodebook,
44}
45
46impl IVFRaBitQIndexData {
47 pub fn to_bytes(&self) -> std::io::Result<Vec<u8>> {
48 serde_json::to_vec(self)
49 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
50 }
51
52 pub fn from_bytes(data: &[u8]) -> std::io::Result<Self> {
53 serde_json::from_slice(data)
54 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
55 }
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct ScaNNIndexData {
61 pub index: crate::structures::IVFPQIndex,
62 pub centroids: crate::structures::CoarseCentroids,
63 pub codebook: crate::structures::PQCodebook,
64}
65
66impl ScaNNIndexData {
67 pub fn to_bytes(&self) -> std::io::Result<Vec<u8>> {
68 serde_json::to_vec(self)
69 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
70 }
71
72 pub fn from_bytes(data: &[u8]) -> std::io::Result<Self> {
73 serde_json::from_slice(data)
74 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
75 }
76}
77
78const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; #[derive(Clone, Copy, PartialEq, Eq, Hash)]
83struct TermKey {
84 field: u32,
85 term: Spur,
86}
87
88#[derive(Clone, Copy)]
90struct CompactPosting {
91 doc_id: DocId,
92 term_freq: u16,
93}
94
95struct PostingListBuilder {
97 postings: Vec<CompactPosting>,
99}
100
101impl PostingListBuilder {
102 fn new() -> Self {
103 Self {
104 postings: Vec::new(),
105 }
106 }
107
108 #[inline]
110 fn add(&mut self, doc_id: DocId, term_freq: u32) {
111 if let Some(last) = self.postings.last_mut()
113 && last.doc_id == doc_id
114 {
115 last.term_freq = last.term_freq.saturating_add(term_freq as u16);
116 return;
117 }
118 self.postings.push(CompactPosting {
119 doc_id,
120 term_freq: term_freq.min(u16::MAX as u32) as u16,
121 });
122 }
123
124 fn len(&self) -> usize {
125 self.postings.len()
126 }
127}
128
129struct PositionPostingListBuilder {
131 postings: Vec<(DocId, Vec<u32>)>,
133}
134
135impl PositionPostingListBuilder {
136 fn new() -> Self {
137 Self {
138 postings: Vec::new(),
139 }
140 }
141
142 #[inline]
144 fn add_position(&mut self, doc_id: DocId, position: u32) {
145 if let Some((last_doc, positions)) = self.postings.last_mut()
146 && *last_doc == doc_id
147 {
148 positions.push(position);
149 return;
150 }
151 self.postings.push((doc_id, vec![position]));
152 }
153}
154
155enum SerializedPosting {
157 Inline(TermInfo),
159 External { bytes: Vec<u8>, doc_count: u32 },
161}
162
163#[derive(Debug, Clone)]
165pub struct SegmentBuilderStats {
166 pub num_docs: u32,
168 pub unique_terms: usize,
170 pub postings_in_memory: usize,
172 pub interned_strings: usize,
174 pub doc_field_lengths_size: usize,
176 pub estimated_memory_bytes: usize,
178 pub memory_breakdown: MemoryBreakdown,
180}
181
182#[derive(Debug, Clone, Default)]
184pub struct MemoryBreakdown {
185 pub postings_bytes: usize,
187 pub index_overhead_bytes: usize,
189 pub interner_bytes: usize,
191 pub field_lengths_bytes: usize,
193 pub dense_vectors_bytes: usize,
195 pub dense_vector_count: usize,
197}
198
199#[derive(Clone)]
201pub struct SegmentBuilderConfig {
202 pub temp_dir: PathBuf,
204 pub compression_level: CompressionLevel,
206 pub num_compression_threads: usize,
208 pub interner_capacity: usize,
210 pub posting_map_capacity: usize,
212}
213
214impl Default for SegmentBuilderConfig {
215 fn default() -> Self {
216 Self {
217 temp_dir: std::env::temp_dir(),
218 compression_level: CompressionLevel(7),
219 num_compression_threads: num_cpus::get(),
220 interner_capacity: 1_000_000,
221 posting_map_capacity: 500_000,
222 }
223 }
224}
225
226pub struct SegmentBuilder {
233 schema: Schema,
234 config: SegmentBuilderConfig,
235 tokenizers: FxHashMap<Field, BoxedTokenizer>,
236
237 term_interner: Rodeo,
239
240 inverted_index: HashMap<TermKey, PostingListBuilder>,
242
243 store_file: BufWriter<File>,
245 store_path: PathBuf,
246
247 next_doc_id: DocId,
249
250 field_stats: FxHashMap<u32, FieldStats>,
252
253 doc_field_lengths: Vec<u32>,
257 num_indexed_fields: usize,
258 field_to_slot: FxHashMap<u32, usize>,
259
260 local_tf_buffer: FxHashMap<Spur, u32>,
263
264 token_buffer: String,
266
267 dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
270
271 sparse_vectors: FxHashMap<u32, SparseVectorBuilder>,
274
275 position_index: HashMap<TermKey, PositionPostingListBuilder>,
278
279 position_enabled_fields: FxHashMap<u32, Option<crate::dsl::PositionMode>>,
281
282 current_element_ordinal: FxHashMap<u32, u32>,
284}
285
286struct DenseVectorBuilder {
288 dim: usize,
290 doc_ids: Vec<DocId>,
292 vectors: Vec<f32>,
294}
295
296impl DenseVectorBuilder {
297 fn new(dim: usize) -> Self {
298 Self {
299 dim,
300 doc_ids: Vec::new(),
301 vectors: Vec::new(),
302 }
303 }
304
305 fn add(&mut self, doc_id: DocId, vector: &[f32]) {
306 debug_assert_eq!(vector.len(), self.dim, "Vector dimension mismatch");
307 self.doc_ids.push(doc_id);
308 self.vectors.extend_from_slice(vector);
309 }
310
311 fn len(&self) -> usize {
312 self.doc_ids.len()
313 }
314
315 fn get_vectors(&self) -> Vec<Vec<f32>> {
317 self.doc_ids
318 .iter()
319 .enumerate()
320 .map(|(i, _)| {
321 let start = i * self.dim;
322 self.vectors[start..start + self.dim].to_vec()
323 })
324 .collect()
325 }
326
327 fn get_vectors_trimmed(&self, trim_dim: usize) -> Vec<Vec<f32>> {
329 debug_assert!(trim_dim <= self.dim, "trim_dim must be <= dim");
330 self.doc_ids
331 .iter()
332 .enumerate()
333 .map(|(i, _)| {
334 let start = i * self.dim;
335 self.vectors[start..start + trim_dim].to_vec()
336 })
337 .collect()
338 }
339}
340
341struct SparseVectorBuilder {
346 postings: FxHashMap<u32, Vec<(DocId, f32)>>,
348}
349
350impl SparseVectorBuilder {
351 fn new() -> Self {
352 Self {
353 postings: FxHashMap::default(),
354 }
355 }
356
357 #[inline]
359 fn add(&mut self, dim_id: u32, doc_id: DocId, weight: f32) {
360 self.postings
361 .entry(dim_id)
362 .or_default()
363 .push((doc_id, weight));
364 }
365
366 fn is_empty(&self) -> bool {
367 self.postings.is_empty()
368 }
369}
370
371impl SegmentBuilder {
372 pub fn new(schema: Schema, config: SegmentBuilderConfig) -> Result<Self> {
374 let segment_id = uuid::Uuid::new_v4();
375 let store_path = config
376 .temp_dir
377 .join(format!("hermes_store_{}.tmp", segment_id));
378
379 let store_file = BufWriter::with_capacity(
380 STORE_BUFFER_SIZE,
381 OpenOptions::new()
382 .create(true)
383 .write(true)
384 .truncate(true)
385 .open(&store_path)?,
386 );
387
388 let mut num_indexed_fields = 0;
391 let mut field_to_slot = FxHashMap::default();
392 let mut position_enabled_fields = FxHashMap::default();
393 for (field, entry) in schema.fields() {
394 if entry.indexed && matches!(entry.field_type, FieldType::Text) {
395 field_to_slot.insert(field.0, num_indexed_fields);
396 num_indexed_fields += 1;
397 if entry.positions.is_some() {
398 position_enabled_fields.insert(field.0, entry.positions);
399 }
400 }
401 }
402
403 Ok(Self {
404 schema,
405 tokenizers: FxHashMap::default(),
406 term_interner: Rodeo::new(),
407 inverted_index: HashMap::with_capacity(config.posting_map_capacity),
408 store_file,
409 store_path,
410 next_doc_id: 0,
411 field_stats: FxHashMap::default(),
412 doc_field_lengths: Vec::new(),
413 num_indexed_fields,
414 field_to_slot,
415 local_tf_buffer: FxHashMap::default(),
416 token_buffer: String::with_capacity(64),
417 config,
418 dense_vectors: FxHashMap::default(),
419 sparse_vectors: FxHashMap::default(),
420 position_index: HashMap::new(),
421 position_enabled_fields,
422 current_element_ordinal: FxHashMap::default(),
423 })
424 }
425
426 pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
427 self.tokenizers.insert(field, tokenizer);
428 }
429
430 pub fn num_docs(&self) -> u32 {
431 self.next_doc_id
432 }
433
434 pub fn stats(&self) -> SegmentBuilderStats {
436 use std::mem::size_of;
437
438 let postings_in_memory: usize =
439 self.inverted_index.values().map(|p| p.postings.len()).sum();
440
441 let compact_posting_size = size_of::<CompactPosting>();
444
445 let postings_bytes: usize = self
447 .inverted_index
448 .values()
449 .map(|p| {
450 p.postings.capacity() * compact_posting_size + size_of::<Vec<CompactPosting>>()
451 })
452 .sum();
453
454 let term_key_size = size_of::<TermKey>();
458 let posting_builder_size = size_of::<PostingListBuilder>();
459 let hashmap_entry_overhead = 24; let index_overhead_bytes = self.inverted_index.len()
461 * (term_key_size + posting_builder_size + hashmap_entry_overhead);
462
463 let avg_term_len = 8;
467 let interner_overhead_per_string = size_of::<lasso::Spur>() + 16;
468 let interner_bytes =
469 self.term_interner.len() * (avg_term_len + interner_overhead_per_string);
470
471 let field_lengths_bytes =
473 self.doc_field_lengths.capacity() * size_of::<u32>() + size_of::<Vec<u32>>();
474
475 let mut dense_vectors_bytes: usize = 0;
477 let mut dense_vector_count: usize = 0;
478 for b in self.dense_vectors.values() {
479 dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
481 + b.doc_ids.capacity() * size_of::<DocId>()
482 + size_of::<Vec<f32>>()
483 + size_of::<Vec<DocId>>();
484 dense_vector_count += b.doc_ids.len();
485 }
486
487 let local_tf_buffer_bytes =
489 self.local_tf_buffer.capacity() * (size_of::<lasso::Spur>() + size_of::<u32>() + 16);
490
491 let estimated_memory_bytes = postings_bytes
492 + index_overhead_bytes
493 + interner_bytes
494 + field_lengths_bytes
495 + dense_vectors_bytes
496 + local_tf_buffer_bytes;
497
498 let memory_breakdown = MemoryBreakdown {
499 postings_bytes,
500 index_overhead_bytes,
501 interner_bytes,
502 field_lengths_bytes,
503 dense_vectors_bytes,
504 dense_vector_count,
505 };
506
507 SegmentBuilderStats {
508 num_docs: self.next_doc_id,
509 unique_terms: self.inverted_index.len(),
510 postings_in_memory,
511 interned_strings: self.term_interner.len(),
512 doc_field_lengths_size: self.doc_field_lengths.len(),
513 estimated_memory_bytes,
514 memory_breakdown,
515 }
516 }
517
518 pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
520 let doc_id = self.next_doc_id;
521 self.next_doc_id += 1;
522
523 let base_idx = self.doc_field_lengths.len();
525 self.doc_field_lengths
526 .resize(base_idx + self.num_indexed_fields, 0);
527
528 self.current_element_ordinal.clear();
530
531 for (field, value) in doc.field_values() {
532 let entry = self.schema.get_field_entry(*field);
533 if entry.is_none() || !entry.unwrap().indexed {
534 continue;
535 }
536
537 let entry = entry.unwrap();
538 match (&entry.field_type, value) {
539 (FieldType::Text, FieldValue::Text(text)) => {
540 let element_ordinal = *self.current_element_ordinal.get(&field.0).unwrap_or(&0);
542 let token_count =
543 self.index_text_field(*field, doc_id, text, element_ordinal)?;
544 *self.current_element_ordinal.entry(field.0).or_insert(0) += 1;
546
547 let stats = self.field_stats.entry(field.0).or_default();
549 stats.total_tokens += token_count as u64;
550 stats.doc_count += 1;
551
552 if let Some(&slot) = self.field_to_slot.get(&field.0) {
554 self.doc_field_lengths[base_idx + slot] = token_count;
555 }
556 }
557 (FieldType::U64, FieldValue::U64(v)) => {
558 self.index_numeric_field(*field, doc_id, *v)?;
559 }
560 (FieldType::I64, FieldValue::I64(v)) => {
561 self.index_numeric_field(*field, doc_id, *v as u64)?;
562 }
563 (FieldType::F64, FieldValue::F64(v)) => {
564 self.index_numeric_field(*field, doc_id, v.to_bits())?;
565 }
566 (FieldType::DenseVector, FieldValue::DenseVector(vec)) => {
567 self.index_dense_vector_field(*field, doc_id, vec)?;
568 }
569 (FieldType::SparseVector, FieldValue::SparseVector(entries)) => {
570 self.index_sparse_vector_field(*field, doc_id, entries)?;
571 }
572 _ => {}
573 }
574 }
575
576 self.write_document_to_store(&doc)?;
578
579 Ok(doc_id)
580 }
581
582 fn index_text_field(
593 &mut self,
594 field: Field,
595 doc_id: DocId,
596 text: &str,
597 element_ordinal: u32,
598 ) -> Result<u32> {
599 use crate::dsl::PositionMode;
600
601 let field_id = field.0;
602 let position_mode = self
603 .position_enabled_fields
604 .get(&field_id)
605 .copied()
606 .flatten();
607
608 self.local_tf_buffer.clear();
612
613 let mut local_positions: FxHashMap<Spur, Vec<u32>> = FxHashMap::default();
615
616 let mut token_position = 0u32;
617
618 for word in text.split_whitespace() {
620 self.token_buffer.clear();
622 for c in word.chars() {
623 if c.is_alphanumeric() {
624 for lc in c.to_lowercase() {
625 self.token_buffer.push(lc);
626 }
627 }
628 }
629
630 if self.token_buffer.is_empty() {
631 continue;
632 }
633
634 let term_spur = self.term_interner.get_or_intern(&self.token_buffer);
636 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
637
638 if let Some(mode) = position_mode {
640 let encoded_pos = match mode {
641 PositionMode::Ordinal => element_ordinal << 20,
643 PositionMode::TokenPosition => token_position,
645 PositionMode::Full => (element_ordinal << 20) | token_position,
647 };
648 local_positions
649 .entry(term_spur)
650 .or_default()
651 .push(encoded_pos);
652 }
653
654 token_position += 1;
655 }
656
657 for (&term_spur, &tf) in &self.local_tf_buffer {
660 let term_key = TermKey {
661 field: field_id,
662 term: term_spur,
663 };
664
665 let posting = self
666 .inverted_index
667 .entry(term_key)
668 .or_insert_with(PostingListBuilder::new);
669 posting.add(doc_id, tf);
670
671 if position_mode.is_some()
673 && let Some(positions) = local_positions.get(&term_spur)
674 {
675 let pos_posting = self
676 .position_index
677 .entry(term_key)
678 .or_insert_with(PositionPostingListBuilder::new);
679 for &pos in positions {
680 pos_posting.add_position(doc_id, pos);
681 }
682 }
683 }
684
685 Ok(token_position)
686 }
687
688 fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
689 let term_str = format!("__num_{}", value);
691 let term_spur = self.term_interner.get_or_intern(&term_str);
692
693 let term_key = TermKey {
694 field: field.0,
695 term: term_spur,
696 };
697
698 let posting = self
699 .inverted_index
700 .entry(term_key)
701 .or_insert_with(PostingListBuilder::new);
702 posting.add(doc_id, 1);
703
704 Ok(())
705 }
706
707 fn index_dense_vector_field(
709 &mut self,
710 field: Field,
711 doc_id: DocId,
712 vector: &[f32],
713 ) -> Result<()> {
714 let dim = vector.len();
715
716 let builder = self
717 .dense_vectors
718 .entry(field.0)
719 .or_insert_with(|| DenseVectorBuilder::new(dim));
720
721 if builder.dim != dim && builder.len() > 0 {
723 return Err(crate::Error::Schema(format!(
724 "Dense vector dimension mismatch: expected {}, got {}",
725 builder.dim, dim
726 )));
727 }
728
729 builder.add(doc_id, vector);
730 Ok(())
731 }
732
733 fn index_sparse_vector_field(
740 &mut self,
741 field: Field,
742 doc_id: DocId,
743 entries: &[(u32, f32)],
744 ) -> Result<()> {
745 let weight_threshold = self
747 .schema
748 .get_field_entry(field)
749 .and_then(|entry| entry.sparse_vector_config.as_ref())
750 .map(|config| config.weight_threshold)
751 .unwrap_or(0.0);
752
753 let builder = self
754 .sparse_vectors
755 .entry(field.0)
756 .or_insert_with(SparseVectorBuilder::new);
757
758 for &(dim_id, weight) in entries {
759 if weight.abs() < weight_threshold {
761 continue;
762 }
763
764 builder.add(dim_id, doc_id, weight);
765 }
766
767 Ok(())
768 }
769
770 fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
772 use byteorder::{LittleEndian, WriteBytesExt};
773
774 let doc_bytes = super::store::serialize_document(doc, &self.schema)?;
775
776 self.store_file
777 .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
778 self.store_file.write_all(&doc_bytes)?;
779
780 Ok(())
781 }
782
783 pub async fn build<D: Directory + DirectoryWriter>(
785 mut self,
786 dir: &D,
787 segment_id: SegmentId,
788 ) -> Result<SegmentMeta> {
789 self.store_file.flush()?;
791
792 let files = SegmentFiles::new(segment_id.0);
793
794 let (positions_data, position_offsets) = self.build_positions_file()?;
796
797 let store_path = self.store_path.clone();
799 let schema = self.schema.clone();
800 let num_compression_threads = self.config.num_compression_threads;
801 let compression_level = self.config.compression_level;
802
803 let (postings_result, store_result) = rayon::join(
805 || self.build_postings(&position_offsets),
806 || {
807 Self::build_store_parallel(
808 &store_path,
809 &schema,
810 num_compression_threads,
811 compression_level,
812 )
813 },
814 );
815
816 let (term_dict_data, postings_data) = postings_result?;
817 let store_data = store_result?;
818
819 dir.write(&files.term_dict, &term_dict_data).await?;
821 dir.write(&files.postings, &postings_data).await?;
822 dir.write(&files.store, &store_data).await?;
823
824 if !positions_data.is_empty() {
826 dir.write(&files.positions, &positions_data).await?;
827 }
828
829 if !self.dense_vectors.is_empty() {
831 let vectors_data = self.build_vectors_file()?;
832 if !vectors_data.is_empty() {
833 dir.write(&files.vectors, &vectors_data).await?;
834 }
835 }
836
837 if !self.sparse_vectors.is_empty() {
839 let sparse_data = self.build_sparse_file()?;
840 if !sparse_data.is_empty() {
841 dir.write(&files.sparse, &sparse_data).await?;
842 }
843 }
844
845 let meta = SegmentMeta {
846 id: segment_id.0,
847 num_docs: self.next_doc_id,
848 field_stats: self.field_stats.clone(),
849 };
850
851 dir.write(&files.meta, &meta.serialize()?).await?;
852
853 let _ = std::fs::remove_file(&self.store_path);
855
856 Ok(meta)
857 }
858
859 fn build_vectors_file(&self) -> Result<Vec<u8>> {
866 use byteorder::{LittleEndian, WriteBytesExt};
867
868 let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
870
871 for (&field_id, builder) in &self.dense_vectors {
872 if builder.len() == 0 {
873 continue;
874 }
875
876 let field = crate::dsl::Field(field_id);
877
878 let dense_config = self
880 .schema
881 .get_field_entry(field)
882 .and_then(|e| e.dense_vector_config.as_ref());
883
884 let index_dim = dense_config.map(|c| c.index_dim()).unwrap_or(builder.dim);
886 let vectors = if index_dim < builder.dim {
887 builder.get_vectors_trimmed(index_dim)
889 } else {
890 builder.get_vectors()
891 };
892
893 let flat_data = FlatVectorData {
897 dim: index_dim,
898 vectors: vectors.clone(),
899 doc_ids: builder.doc_ids.clone(),
900 };
901 let index_bytes = serde_json::to_vec(&flat_data)
902 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
903 let index_type = 3u8; field_indexes.push((field_id, index_type, index_bytes));
906 }
907
908 if field_indexes.is_empty() {
909 return Ok(Vec::new());
910 }
911
912 field_indexes.sort_by_key(|(id, _, _)| *id);
914
915 let header_size = 4 + field_indexes.len() * (4 + 1 + 8 + 8);
917
918 let mut output = Vec::new();
920
921 output.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
923
924 let mut current_offset = header_size as u64;
926 for (field_id, index_type, data) in &field_indexes {
927 output.write_u32::<LittleEndian>(*field_id)?;
928 output.write_u8(*index_type)?;
929 output.write_u64::<LittleEndian>(current_offset)?;
930 output.write_u64::<LittleEndian>(data.len() as u64)?;
931 current_offset += data.len() as u64;
932 }
933
934 for (_, _, data) in field_indexes {
936 output.extend_from_slice(&data);
937 }
938
939 Ok(output)
940 }
941
942 fn build_sparse_file(&self) -> Result<Vec<u8>> {
954 use crate::structures::{BlockSparsePostingList, WeightQuantization};
955 use byteorder::{LittleEndian, WriteBytesExt};
956
957 if self.sparse_vectors.is_empty() {
958 return Ok(Vec::new());
959 }
960
961 type SparseFieldData = (u32, WeightQuantization, u32, FxHashMap<u32, Vec<u8>>);
963 let mut field_data: Vec<SparseFieldData> = Vec::new();
964
965 for (&field_id, builder) in &self.sparse_vectors {
966 if builder.is_empty() {
967 continue;
968 }
969
970 let field = crate::dsl::Field(field_id);
971
972 let quantization = self
974 .schema
975 .get_field_entry(field)
976 .and_then(|e| e.sparse_vector_config.as_ref())
977 .map(|c| c.weight_quantization)
978 .unwrap_or(WeightQuantization::Float32);
979
980 let max_dim_id = builder.postings.keys().max().copied().unwrap_or(0);
982
983 let mut dim_bytes: FxHashMap<u32, Vec<u8>> = FxHashMap::default();
985
986 for (&dim_id, postings) in &builder.postings {
987 let mut sorted_postings = postings.clone();
989 sorted_postings.sort_by_key(|(doc_id, _)| *doc_id);
990
991 let block_list =
993 BlockSparsePostingList::from_postings(&sorted_postings, quantization)
994 .map_err(crate::Error::Io)?;
995
996 let mut bytes = Vec::new();
998 block_list.serialize(&mut bytes).map_err(crate::Error::Io)?;
999
1000 dim_bytes.insert(dim_id, bytes);
1001 }
1002
1003 field_data.push((field_id, quantization, max_dim_id + 1, dim_bytes));
1004 }
1005
1006 if field_data.is_empty() {
1007 return Ok(Vec::new());
1008 }
1009
1010 field_data.sort_by_key(|(id, _, _, _)| *id);
1012
1013 let mut header_size = 4u64;
1017 for (_, _, max_dim_id, _) in &field_data {
1018 header_size += 4 + 1 + 4; header_size += (*max_dim_id as u64) * 12; }
1021
1022 let mut output = Vec::new();
1024
1025 output.write_u32::<LittleEndian>(field_data.len() as u32)?;
1027
1028 let mut current_offset = header_size;
1030
1031 let mut all_data: Vec<u8> = Vec::new();
1033 let mut field_tables: Vec<Vec<(u64, u32)>> = Vec::new();
1034
1035 for (_, _, max_dim_id, dim_bytes) in &field_data {
1036 let mut table: Vec<(u64, u32)> = vec![(0, 0); *max_dim_id as usize];
1037
1038 for dim_id in 0..*max_dim_id {
1040 if let Some(bytes) = dim_bytes.get(&dim_id) {
1041 table[dim_id as usize] = (current_offset, bytes.len() as u32);
1042 current_offset += bytes.len() as u64;
1043 all_data.extend_from_slice(bytes);
1044 }
1045 }
1047
1048 field_tables.push(table);
1049 }
1050
1051 for (i, (field_id, quantization, max_dim_id, _)) in field_data.iter().enumerate() {
1053 output.write_u32::<LittleEndian>(*field_id)?;
1054 output.write_u8(*quantization as u8)?;
1055 output.write_u32::<LittleEndian>(*max_dim_id)?;
1056
1057 for &(offset, length) in &field_tables[i] {
1059 output.write_u64::<LittleEndian>(offset)?;
1060 output.write_u32::<LittleEndian>(length)?;
1061 }
1062 }
1063
1064 output.extend_from_slice(&all_data);
1066
1067 Ok(output)
1068 }
1069
1070 #[allow(clippy::type_complexity)]
1078 fn build_positions_file(&self) -> Result<(Vec<u8>, FxHashMap<Vec<u8>, (u64, u32)>)> {
1079 use crate::structures::PositionPostingList;
1080
1081 let mut position_offsets: FxHashMap<Vec<u8>, (u64, u32)> = FxHashMap::default();
1082
1083 if self.position_index.is_empty() {
1084 return Ok((Vec::new(), position_offsets));
1085 }
1086
1087 let mut entries: Vec<(Vec<u8>, &PositionPostingListBuilder)> = self
1089 .position_index
1090 .iter()
1091 .map(|(term_key, pos_list)| {
1092 let term_str = self.term_interner.resolve(&term_key.term);
1093 let mut key = Vec::with_capacity(4 + term_str.len());
1094 key.extend_from_slice(&term_key.field.to_le_bytes());
1095 key.extend_from_slice(term_str.as_bytes());
1096 (key, pos_list)
1097 })
1098 .collect();
1099
1100 entries.sort_by(|a, b| a.0.cmp(&b.0));
1101
1102 let mut output = Vec::new();
1104
1105 for (key, pos_builder) in entries {
1106 let mut pos_list = PositionPostingList::with_capacity(pos_builder.postings.len());
1108 for (doc_id, positions) in &pos_builder.postings {
1109 pos_list.push(*doc_id, positions.clone());
1110 }
1111
1112 let offset = output.len() as u64;
1114 pos_list.serialize(&mut output).map_err(crate::Error::Io)?;
1115 let len = (output.len() as u64 - offset) as u32;
1116
1117 position_offsets.insert(key, (offset, len));
1118 }
1119
1120 Ok((output, position_offsets))
1121 }
1122
1123 fn build_postings(
1128 &mut self,
1129 position_offsets: &FxHashMap<Vec<u8>, (u64, u32)>,
1130 ) -> Result<(Vec<u8>, Vec<u8>)> {
1131 let mut term_entries: Vec<(Vec<u8>, &PostingListBuilder)> = self
1134 .inverted_index
1135 .iter()
1136 .map(|(term_key, posting_list)| {
1137 let term_str = self.term_interner.resolve(&term_key.term);
1138 let mut key = Vec::with_capacity(4 + term_str.len());
1139 key.extend_from_slice(&term_key.field.to_le_bytes());
1140 key.extend_from_slice(term_str.as_bytes());
1141 (key, posting_list)
1142 })
1143 .collect();
1144
1145 term_entries.par_sort_unstable_by(|a, b| a.0.cmp(&b.0));
1147
1148 let serialized: Vec<(Vec<u8>, SerializedPosting)> = term_entries
1151 .into_par_iter()
1152 .map(|(key, posting_builder)| {
1153 let mut full_postings = PostingList::with_capacity(posting_builder.len());
1155 for p in &posting_builder.postings {
1156 full_postings.push(p.doc_id, p.term_freq as u32);
1157 }
1158
1159 let doc_ids: Vec<u32> = full_postings.iter().map(|p| p.doc_id).collect();
1161 let term_freqs: Vec<u32> = full_postings.iter().map(|p| p.term_freq).collect();
1162
1163 let has_positions = position_offsets.contains_key(&key);
1165 let result = if !has_positions
1166 && let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs)
1167 {
1168 SerializedPosting::Inline(inline)
1169 } else {
1170 let mut posting_bytes = Vec::new();
1172 let block_list =
1173 crate::structures::BlockPostingList::from_posting_list(&full_postings)
1174 .expect("BlockPostingList creation failed");
1175 block_list
1176 .serialize(&mut posting_bytes)
1177 .expect("BlockPostingList serialization failed");
1178 SerializedPosting::External {
1179 bytes: posting_bytes,
1180 doc_count: full_postings.doc_count(),
1181 }
1182 };
1183
1184 (key, result)
1185 })
1186 .collect();
1187
1188 let mut term_dict = Vec::new();
1190 let mut postings = Vec::new();
1191 let mut writer = SSTableWriter::<TermInfo>::new(&mut term_dict);
1192
1193 for (key, serialized_posting) in serialized {
1194 let term_info = match serialized_posting {
1195 SerializedPosting::Inline(info) => info,
1196 SerializedPosting::External { bytes, doc_count } => {
1197 let posting_offset = postings.len() as u64;
1198 let posting_len = bytes.len() as u32;
1199 postings.extend_from_slice(&bytes);
1200
1201 if let Some(&(pos_offset, pos_len)) = position_offsets.get(&key) {
1203 TermInfo::external_with_positions(
1204 posting_offset,
1205 posting_len,
1206 doc_count,
1207 pos_offset,
1208 pos_len,
1209 )
1210 } else {
1211 TermInfo::external(posting_offset, posting_len, doc_count)
1212 }
1213 }
1214 };
1215
1216 writer.insert(&key, &term_info)?;
1217 }
1218
1219 writer.finish()?;
1220 Ok((term_dict, postings))
1221 }
1222
1223 fn build_store_parallel(
1227 store_path: &PathBuf,
1228 schema: &Schema,
1229 num_compression_threads: usize,
1230 compression_level: CompressionLevel,
1231 ) -> Result<Vec<u8>> {
1232 use super::store::EagerParallelStoreWriter;
1233
1234 let file = File::open(store_path)?;
1235 let mmap = unsafe { memmap2::Mmap::map(&file)? };
1236
1237 let mut doc_ranges: Vec<(usize, usize)> = Vec::new();
1239 let mut offset = 0usize;
1240 while offset + 4 <= mmap.len() {
1241 let doc_len = u32::from_le_bytes([
1242 mmap[offset],
1243 mmap[offset + 1],
1244 mmap[offset + 2],
1245 mmap[offset + 3],
1246 ]) as usize;
1247 offset += 4;
1248
1249 if offset + doc_len > mmap.len() {
1250 break;
1251 }
1252
1253 doc_ranges.push((offset, doc_len));
1254 offset += doc_len;
1255 }
1256
1257 let docs: Vec<Document> = doc_ranges
1259 .into_par_iter()
1260 .filter_map(|(start, len)| {
1261 let doc_bytes = &mmap[start..start + len];
1262 super::store::deserialize_document(doc_bytes, schema).ok()
1263 })
1264 .collect();
1265
1266 let mut store_data = Vec::new();
1268 let mut store_writer = EagerParallelStoreWriter::with_compression_level(
1269 &mut store_data,
1270 num_compression_threads,
1271 compression_level,
1272 );
1273
1274 for doc in &docs {
1275 store_writer.store(doc, schema)?;
1276 }
1277
1278 store_writer.finish()?;
1279 Ok(store_data)
1280 }
1281}
1282
1283impl Drop for SegmentBuilder {
1284 fn drop(&mut self) {
1285 let _ = std::fs::remove_file(&self.store_path);
1287 }
1288}