1use std::fs::{File, OpenOptions};
12use std::io::{BufWriter, Write};
13use std::path::PathBuf;
14
15use hashbrown::HashMap;
16use lasso::{Rodeo, Spur};
17use rayon::prelude::*;
18use rustc_hash::FxHashMap;
19
20use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
21use crate::compression::CompressionLevel;
22use crate::directories::{Directory, DirectoryWriter};
23use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
24use crate::structures::{PostingList, SSTableWriter, TermInfo};
25use crate::tokenizer::BoxedTokenizer;
26use crate::{DocId, Result};
27
28const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; #[derive(Clone, Copy, PartialEq, Eq, Hash)]
33struct TermKey {
34 field: u32,
35 term: Spur,
36}
37
38#[derive(Clone, Copy)]
40struct CompactPosting {
41 doc_id: DocId,
42 term_freq: u16,
43}
44
45struct PostingListBuilder {
47 postings: Vec<CompactPosting>,
49}
50
51impl PostingListBuilder {
52 fn new() -> Self {
53 Self {
54 postings: Vec::new(),
55 }
56 }
57
58 #[inline]
60 fn add(&mut self, doc_id: DocId, term_freq: u32) {
61 if let Some(last) = self.postings.last_mut()
63 && last.doc_id == doc_id
64 {
65 last.term_freq = last.term_freq.saturating_add(term_freq as u16);
66 return;
67 }
68 self.postings.push(CompactPosting {
69 doc_id,
70 term_freq: term_freq.min(u16::MAX as u32) as u16,
71 });
72 }
73
74 fn len(&self) -> usize {
75 self.postings.len()
76 }
77}
78
79struct PositionPostingListBuilder {
81 postings: Vec<(DocId, Vec<u32>)>,
83}
84
85impl PositionPostingListBuilder {
86 fn new() -> Self {
87 Self {
88 postings: Vec::new(),
89 }
90 }
91
92 #[inline]
94 fn add_position(&mut self, doc_id: DocId, position: u32) {
95 if let Some((last_doc, positions)) = self.postings.last_mut()
96 && *last_doc == doc_id
97 {
98 positions.push(position);
99 return;
100 }
101 self.postings.push((doc_id, vec![position]));
102 }
103}
104
105enum SerializedPosting {
107 Inline(TermInfo),
109 External { bytes: Vec<u8>, doc_count: u32 },
111}
112
113#[derive(Debug, Clone)]
115pub struct SegmentBuilderStats {
116 pub num_docs: u32,
118 pub unique_terms: usize,
120 pub postings_in_memory: usize,
122 pub interned_strings: usize,
124 pub doc_field_lengths_size: usize,
126 pub estimated_memory_bytes: usize,
128 pub memory_breakdown: MemoryBreakdown,
130}
131
132#[derive(Debug, Clone, Default)]
134pub struct MemoryBreakdown {
135 pub postings_bytes: usize,
137 pub index_overhead_bytes: usize,
139 pub interner_bytes: usize,
141 pub field_lengths_bytes: usize,
143 pub dense_vectors_bytes: usize,
145 pub dense_vector_count: usize,
147}
148
149#[derive(Clone)]
151pub struct SegmentBuilderConfig {
152 pub temp_dir: PathBuf,
154 pub compression_level: CompressionLevel,
156 pub num_compression_threads: usize,
158 pub interner_capacity: usize,
160 pub posting_map_capacity: usize,
162}
163
164impl Default for SegmentBuilderConfig {
165 fn default() -> Self {
166 Self {
167 temp_dir: std::env::temp_dir(),
168 compression_level: CompressionLevel(7),
169 num_compression_threads: num_cpus::get(),
170 interner_capacity: 1_000_000,
171 posting_map_capacity: 500_000,
172 }
173 }
174}
175
176pub struct SegmentBuilder {
183 schema: Schema,
184 config: SegmentBuilderConfig,
185 tokenizers: FxHashMap<Field, BoxedTokenizer>,
186
187 term_interner: Rodeo,
189
190 inverted_index: HashMap<TermKey, PostingListBuilder>,
192
193 store_file: BufWriter<File>,
195 store_path: PathBuf,
196
197 next_doc_id: DocId,
199
200 field_stats: FxHashMap<u32, FieldStats>,
202
203 doc_field_lengths: Vec<u32>,
207 num_indexed_fields: usize,
208 field_to_slot: FxHashMap<u32, usize>,
209
210 local_tf_buffer: FxHashMap<Spur, u32>,
213
214 token_buffer: String,
216
217 dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
220
221 sparse_vectors: FxHashMap<u32, SparseVectorBuilder>,
224
225 position_index: HashMap<TermKey, PositionPostingListBuilder>,
228
229 position_enabled_fields: FxHashMap<u32, Option<crate::dsl::PositionMode>>,
231
232 current_element_ordinal: FxHashMap<u32, u32>,
234}
235
236struct DenseVectorBuilder {
238 dim: usize,
240 doc_ids: Vec<DocId>,
242 vectors: Vec<f32>,
244}
245
246impl DenseVectorBuilder {
247 fn new(dim: usize) -> Self {
248 Self {
249 dim,
250 doc_ids: Vec::new(),
251 vectors: Vec::new(),
252 }
253 }
254
255 fn add(&mut self, doc_id: DocId, vector: &[f32]) {
256 debug_assert_eq!(vector.len(), self.dim, "Vector dimension mismatch");
257 self.doc_ids.push(doc_id);
258 self.vectors.extend_from_slice(vector);
259 }
260
261 fn len(&self) -> usize {
262 self.doc_ids.len()
263 }
264
265 fn get_vectors(&self) -> Vec<Vec<f32>> {
267 self.doc_ids
268 .iter()
269 .enumerate()
270 .map(|(i, _)| {
271 let start = i * self.dim;
272 self.vectors[start..start + self.dim].to_vec()
273 })
274 .collect()
275 }
276
277 fn get_vectors_trimmed(&self, trim_dim: usize) -> Vec<Vec<f32>> {
279 debug_assert!(trim_dim <= self.dim, "trim_dim must be <= dim");
280 self.doc_ids
281 .iter()
282 .enumerate()
283 .map(|(i, _)| {
284 let start = i * self.dim;
285 self.vectors[start..start + trim_dim].to_vec()
286 })
287 .collect()
288 }
289}
290
291struct SparseVectorBuilder {
296 postings: FxHashMap<u32, Vec<(DocId, f32)>>,
298}
299
300impl SparseVectorBuilder {
301 fn new() -> Self {
302 Self {
303 postings: FxHashMap::default(),
304 }
305 }
306
307 #[inline]
309 fn add(&mut self, dim_id: u32, doc_id: DocId, weight: f32) {
310 self.postings
311 .entry(dim_id)
312 .or_default()
313 .push((doc_id, weight));
314 }
315
316 fn is_empty(&self) -> bool {
317 self.postings.is_empty()
318 }
319}
320
321impl SegmentBuilder {
322 pub fn new(schema: Schema, config: SegmentBuilderConfig) -> Result<Self> {
324 let segment_id = uuid::Uuid::new_v4();
325 let store_path = config
326 .temp_dir
327 .join(format!("hermes_store_{}.tmp", segment_id));
328
329 let store_file = BufWriter::with_capacity(
330 STORE_BUFFER_SIZE,
331 OpenOptions::new()
332 .create(true)
333 .write(true)
334 .truncate(true)
335 .open(&store_path)?,
336 );
337
338 let mut num_indexed_fields = 0;
341 let mut field_to_slot = FxHashMap::default();
342 let mut position_enabled_fields = FxHashMap::default();
343 for (field, entry) in schema.fields() {
344 if entry.indexed && matches!(entry.field_type, FieldType::Text) {
345 field_to_slot.insert(field.0, num_indexed_fields);
346 num_indexed_fields += 1;
347 if entry.positions.is_some() {
348 position_enabled_fields.insert(field.0, entry.positions);
349 }
350 }
351 }
352
353 Ok(Self {
354 schema,
355 tokenizers: FxHashMap::default(),
356 term_interner: Rodeo::new(),
357 inverted_index: HashMap::with_capacity(config.posting_map_capacity),
358 store_file,
359 store_path,
360 next_doc_id: 0,
361 field_stats: FxHashMap::default(),
362 doc_field_lengths: Vec::new(),
363 num_indexed_fields,
364 field_to_slot,
365 local_tf_buffer: FxHashMap::default(),
366 token_buffer: String::with_capacity(64),
367 config,
368 dense_vectors: FxHashMap::default(),
369 sparse_vectors: FxHashMap::default(),
370 position_index: HashMap::new(),
371 position_enabled_fields,
372 current_element_ordinal: FxHashMap::default(),
373 })
374 }
375
376 pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
377 self.tokenizers.insert(field, tokenizer);
378 }
379
380 pub fn num_docs(&self) -> u32 {
381 self.next_doc_id
382 }
383
384 pub fn stats(&self) -> SegmentBuilderStats {
386 use std::mem::size_of;
387
388 let postings_in_memory: usize =
389 self.inverted_index.values().map(|p| p.postings.len()).sum();
390
391 let compact_posting_size = size_of::<CompactPosting>();
394
395 let postings_bytes: usize = self
397 .inverted_index
398 .values()
399 .map(|p| {
400 p.postings.capacity() * compact_posting_size + size_of::<Vec<CompactPosting>>()
401 })
402 .sum();
403
404 let term_key_size = size_of::<TermKey>();
408 let posting_builder_size = size_of::<PostingListBuilder>();
409 let hashmap_entry_overhead = 24; let index_overhead_bytes = self.inverted_index.len()
411 * (term_key_size + posting_builder_size + hashmap_entry_overhead);
412
413 let avg_term_len = 8;
417 let interner_overhead_per_string = size_of::<lasso::Spur>() + 16;
418 let interner_bytes =
419 self.term_interner.len() * (avg_term_len + interner_overhead_per_string);
420
421 let field_lengths_bytes =
423 self.doc_field_lengths.capacity() * size_of::<u32>() + size_of::<Vec<u32>>();
424
425 let mut dense_vectors_bytes: usize = 0;
427 let mut dense_vector_count: usize = 0;
428 for b in self.dense_vectors.values() {
429 dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
431 + b.doc_ids.capacity() * size_of::<DocId>()
432 + size_of::<Vec<f32>>()
433 + size_of::<Vec<DocId>>();
434 dense_vector_count += b.doc_ids.len();
435 }
436
437 let local_tf_buffer_bytes =
439 self.local_tf_buffer.capacity() * (size_of::<lasso::Spur>() + size_of::<u32>() + 16);
440
441 let estimated_memory_bytes = postings_bytes
442 + index_overhead_bytes
443 + interner_bytes
444 + field_lengths_bytes
445 + dense_vectors_bytes
446 + local_tf_buffer_bytes;
447
448 let memory_breakdown = MemoryBreakdown {
449 postings_bytes,
450 index_overhead_bytes,
451 interner_bytes,
452 field_lengths_bytes,
453 dense_vectors_bytes,
454 dense_vector_count,
455 };
456
457 SegmentBuilderStats {
458 num_docs: self.next_doc_id,
459 unique_terms: self.inverted_index.len(),
460 postings_in_memory,
461 interned_strings: self.term_interner.len(),
462 doc_field_lengths_size: self.doc_field_lengths.len(),
463 estimated_memory_bytes,
464 memory_breakdown,
465 }
466 }
467
468 pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
470 let doc_id = self.next_doc_id;
471 self.next_doc_id += 1;
472
473 let base_idx = self.doc_field_lengths.len();
475 self.doc_field_lengths
476 .resize(base_idx + self.num_indexed_fields, 0);
477
478 self.current_element_ordinal.clear();
480
481 for (field, value) in doc.field_values() {
482 let entry = self.schema.get_field_entry(*field);
483 if entry.is_none() || !entry.unwrap().indexed {
484 continue;
485 }
486
487 let entry = entry.unwrap();
488 match (&entry.field_type, value) {
489 (FieldType::Text, FieldValue::Text(text)) => {
490 let element_ordinal = *self.current_element_ordinal.get(&field.0).unwrap_or(&0);
492 let token_count =
493 self.index_text_field(*field, doc_id, text, element_ordinal)?;
494 *self.current_element_ordinal.entry(field.0).or_insert(0) += 1;
496
497 let stats = self.field_stats.entry(field.0).or_default();
499 stats.total_tokens += token_count as u64;
500 stats.doc_count += 1;
501
502 if let Some(&slot) = self.field_to_slot.get(&field.0) {
504 self.doc_field_lengths[base_idx + slot] = token_count;
505 }
506 }
507 (FieldType::U64, FieldValue::U64(v)) => {
508 self.index_numeric_field(*field, doc_id, *v)?;
509 }
510 (FieldType::I64, FieldValue::I64(v)) => {
511 self.index_numeric_field(*field, doc_id, *v as u64)?;
512 }
513 (FieldType::F64, FieldValue::F64(v)) => {
514 self.index_numeric_field(*field, doc_id, v.to_bits())?;
515 }
516 (FieldType::DenseVector, FieldValue::DenseVector(vec)) => {
517 self.index_dense_vector_field(*field, doc_id, vec)?;
518 }
519 (FieldType::SparseVector, FieldValue::SparseVector(entries)) => {
520 self.index_sparse_vector_field(*field, doc_id, entries)?;
521 }
522 _ => {}
523 }
524 }
525
526 self.write_document_to_store(&doc)?;
528
529 Ok(doc_id)
530 }
531
532 fn index_text_field(
543 &mut self,
544 field: Field,
545 doc_id: DocId,
546 text: &str,
547 element_ordinal: u32,
548 ) -> Result<u32> {
549 use crate::dsl::PositionMode;
550
551 let field_id = field.0;
552 let position_mode = self
553 .position_enabled_fields
554 .get(&field_id)
555 .copied()
556 .flatten();
557
558 self.local_tf_buffer.clear();
562
563 let mut local_positions: FxHashMap<Spur, Vec<u32>> = FxHashMap::default();
565
566 let mut token_position = 0u32;
567
568 for word in text.split_whitespace() {
570 self.token_buffer.clear();
572 for c in word.chars() {
573 if c.is_alphanumeric() {
574 for lc in c.to_lowercase() {
575 self.token_buffer.push(lc);
576 }
577 }
578 }
579
580 if self.token_buffer.is_empty() {
581 continue;
582 }
583
584 let term_spur = self.term_interner.get_or_intern(&self.token_buffer);
586 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
587
588 if let Some(mode) = position_mode {
590 let encoded_pos = match mode {
591 PositionMode::Ordinal => element_ordinal << 20,
593 PositionMode::TokenPosition => token_position,
595 PositionMode::Full => (element_ordinal << 20) | token_position,
597 };
598 local_positions
599 .entry(term_spur)
600 .or_default()
601 .push(encoded_pos);
602 }
603
604 token_position += 1;
605 }
606
607 for (&term_spur, &tf) in &self.local_tf_buffer {
610 let term_key = TermKey {
611 field: field_id,
612 term: term_spur,
613 };
614
615 let posting = self
616 .inverted_index
617 .entry(term_key)
618 .or_insert_with(PostingListBuilder::new);
619 posting.add(doc_id, tf);
620
621 if position_mode.is_some()
623 && let Some(positions) = local_positions.get(&term_spur)
624 {
625 let pos_posting = self
626 .position_index
627 .entry(term_key)
628 .or_insert_with(PositionPostingListBuilder::new);
629 for &pos in positions {
630 pos_posting.add_position(doc_id, pos);
631 }
632 }
633 }
634
635 Ok(token_position)
636 }
637
638 fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
639 let term_str = format!("__num_{}", value);
641 let term_spur = self.term_interner.get_or_intern(&term_str);
642
643 let term_key = TermKey {
644 field: field.0,
645 term: term_spur,
646 };
647
648 let posting = self
649 .inverted_index
650 .entry(term_key)
651 .or_insert_with(PostingListBuilder::new);
652 posting.add(doc_id, 1);
653
654 Ok(())
655 }
656
657 fn index_dense_vector_field(
659 &mut self,
660 field: Field,
661 doc_id: DocId,
662 vector: &[f32],
663 ) -> Result<()> {
664 let dim = vector.len();
665
666 let builder = self
667 .dense_vectors
668 .entry(field.0)
669 .or_insert_with(|| DenseVectorBuilder::new(dim));
670
671 if builder.dim != dim && builder.len() > 0 {
673 return Err(crate::Error::Schema(format!(
674 "Dense vector dimension mismatch: expected {}, got {}",
675 builder.dim, dim
676 )));
677 }
678
679 builder.add(doc_id, vector);
680 Ok(())
681 }
682
683 fn index_sparse_vector_field(
690 &mut self,
691 field: Field,
692 doc_id: DocId,
693 entries: &[(u32, f32)],
694 ) -> Result<()> {
695 let weight_threshold = self
697 .schema
698 .get_field_entry(field)
699 .and_then(|entry| entry.sparse_vector_config.as_ref())
700 .map(|config| config.weight_threshold)
701 .unwrap_or(0.0);
702
703 let builder = self
704 .sparse_vectors
705 .entry(field.0)
706 .or_insert_with(SparseVectorBuilder::new);
707
708 for &(dim_id, weight) in entries {
709 if weight.abs() < weight_threshold {
711 continue;
712 }
713
714 builder.add(dim_id, doc_id, weight);
715 }
716
717 Ok(())
718 }
719
720 fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
722 use byteorder::{LittleEndian, WriteBytesExt};
723
724 let doc_bytes = super::store::serialize_document(doc, &self.schema)?;
725
726 self.store_file
727 .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
728 self.store_file.write_all(&doc_bytes)?;
729
730 Ok(())
731 }
732
733 pub async fn build<D: Directory + DirectoryWriter>(
735 mut self,
736 dir: &D,
737 segment_id: SegmentId,
738 ) -> Result<SegmentMeta> {
739 self.store_file.flush()?;
741
742 let files = SegmentFiles::new(segment_id.0);
743
744 let (positions_data, position_offsets) = self.build_positions_file()?;
746
747 let store_path = self.store_path.clone();
749 let schema = self.schema.clone();
750 let num_compression_threads = self.config.num_compression_threads;
751 let compression_level = self.config.compression_level;
752
753 let (postings_result, store_result) = rayon::join(
755 || self.build_postings(&position_offsets),
756 || {
757 Self::build_store_parallel(
758 &store_path,
759 &schema,
760 num_compression_threads,
761 compression_level,
762 )
763 },
764 );
765
766 let (term_dict_data, postings_data) = postings_result?;
767 let store_data = store_result?;
768
769 dir.write(&files.term_dict, &term_dict_data).await?;
771 dir.write(&files.postings, &postings_data).await?;
772 dir.write(&files.store, &store_data).await?;
773
774 if !positions_data.is_empty() {
776 dir.write(&files.positions, &positions_data).await?;
777 }
778
779 if !self.dense_vectors.is_empty() {
781 let vectors_data = self.build_vectors_file()?;
782 if !vectors_data.is_empty() {
783 dir.write(&files.vectors, &vectors_data).await?;
784 }
785 }
786
787 if !self.sparse_vectors.is_empty() {
789 let sparse_data = self.build_sparse_file()?;
790 if !sparse_data.is_empty() {
791 dir.write(&files.sparse, &sparse_data).await?;
792 }
793 }
794
795 let meta = SegmentMeta {
796 id: segment_id.0,
797 num_docs: self.next_doc_id,
798 field_stats: self.field_stats.clone(),
799 };
800
801 dir.write(&files.meta, &meta.serialize()?).await?;
802
803 let _ = std::fs::remove_file(&self.store_path);
805
806 Ok(meta)
807 }
808
809 fn build_vectors_file(&self) -> Result<Vec<u8>> {
816 use crate::dsl::VectorIndexType;
817 use byteorder::{LittleEndian, WriteBytesExt};
818
819 let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
821
822 for (&field_id, builder) in &self.dense_vectors {
823 if builder.len() == 0 {
824 continue;
825 }
826
827 let field = crate::dsl::Field(field_id);
828
829 let dense_config = self
831 .schema
832 .get_field_entry(field)
833 .and_then(|e| e.dense_vector_config.as_ref());
834
835 let index_dim = dense_config.map(|c| c.index_dim()).unwrap_or(builder.dim);
837 let vectors = if index_dim < builder.dim {
838 builder.get_vectors_trimmed(index_dim)
840 } else {
841 builder.get_vectors()
842 };
843
844 let (index_type, index_bytes) = match dense_config.map(|c| c.index_type) {
845 Some(VectorIndexType::ScaNN) => {
846 let config = dense_config.unwrap();
848 let centroids_path =
849 config.coarse_centroids_path.as_ref().ok_or_else(|| {
850 crate::Error::Schema("ScaNN requires coarse_centroids_path".into())
851 })?;
852 let codebook_path = config.pq_codebook_path.as_ref().ok_or_else(|| {
853 crate::Error::Schema("ScaNN requires pq_codebook_path".into())
854 })?;
855
856 let coarse_centroids = crate::structures::CoarseCentroids::load(
857 std::path::Path::new(centroids_path),
858 )
859 .map_err(crate::Error::Io)?;
860
861 let pq_codebook =
862 crate::structures::PQCodebook::load(std::path::Path::new(codebook_path))
863 .map_err(crate::Error::Io)?;
864
865 let doc_ids: Vec<u32> = builder.doc_ids.clone();
866 let ivfpq_config = crate::structures::IVFPQConfig::new(index_dim)
867 .with_store_raw(config.store_raw);
868
869 let ivfpq_index = crate::structures::IVFPQIndex::build(
870 ivfpq_config,
871 &coarse_centroids,
872 &pq_codebook,
873 &vectors,
874 Some(doc_ids.as_slice()),
875 );
876
877 let bytes = ivfpq_index
879 .to_bytes()
880 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
881 (2u8, bytes) }
883 Some(VectorIndexType::IvfRaBitQ) => {
884 let config = dense_config.unwrap();
886 let centroids_path =
887 config.coarse_centroids_path.as_ref().ok_or_else(|| {
888 crate::Error::Schema("IVF-RaBitQ requires coarse_centroids_path".into())
889 })?;
890
891 match crate::structures::CoarseCentroids::load(std::path::Path::new(
892 centroids_path,
893 )) {
894 Ok(coarse_centroids) => {
895 let ivf_cfg = crate::structures::IVFRaBitQConfig::new(index_dim)
896 .with_store_raw(config.store_raw);
897 let rabitq_codebook = crate::structures::RaBitQCodebook::new(
898 crate::structures::RaBitQConfig::new(index_dim),
899 );
900 let doc_ids: Vec<u32> = builder.doc_ids.clone();
901 let ivf_index = crate::structures::IVFRaBitQIndex::build(
902 ivf_cfg,
903 &coarse_centroids,
904 &rabitq_codebook,
905 &vectors,
906 Some(doc_ids.as_slice()),
907 );
908 let bytes = ivf_index
909 .to_bytes()
910 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
911 (1u8, bytes) }
913 Err(e) => {
914 log::warn!("Failed to load centroids: {}, falling back to RaBitQ", e);
915 let cfg = crate::structures::RaBitQConfig::new(index_dim);
916 let idx = crate::structures::RaBitQIndex::build(cfg, &vectors, true);
917 let bytes = serde_json::to_vec(&idx)
918 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
919 (0u8, bytes) }
921 }
922 }
923 _ => {
924 let store_raw = dense_config.map(|c| c.store_raw).unwrap_or(true);
926 let cfg = crate::structures::RaBitQConfig::new(index_dim);
927 let idx = crate::structures::RaBitQIndex::build(cfg, &vectors, store_raw);
928 let bytes = serde_json::to_vec(&idx)
929 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
930 (0u8, bytes) }
932 };
933
934 field_indexes.push((field_id, index_type, index_bytes));
935 }
936
937 if field_indexes.is_empty() {
938 return Ok(Vec::new());
939 }
940
941 field_indexes.sort_by_key(|(id, _, _)| *id);
943
944 let header_size = 4 + field_indexes.len() * (4 + 1 + 8 + 8);
946
947 let mut output = Vec::new();
949
950 output.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
952
953 let mut current_offset = header_size as u64;
955 for (field_id, index_type, data) in &field_indexes {
956 output.write_u32::<LittleEndian>(*field_id)?;
957 output.write_u8(*index_type)?;
958 output.write_u64::<LittleEndian>(current_offset)?;
959 output.write_u64::<LittleEndian>(data.len() as u64)?;
960 current_offset += data.len() as u64;
961 }
962
963 for (_, _, data) in field_indexes {
965 output.extend_from_slice(&data);
966 }
967
968 Ok(output)
969 }
970
971 fn build_sparse_file(&self) -> Result<Vec<u8>> {
983 use crate::structures::{BlockSparsePostingList, WeightQuantization};
984 use byteorder::{LittleEndian, WriteBytesExt};
985
986 if self.sparse_vectors.is_empty() {
987 return Ok(Vec::new());
988 }
989
990 type SparseFieldData = (u32, WeightQuantization, u32, FxHashMap<u32, Vec<u8>>);
992 let mut field_data: Vec<SparseFieldData> = Vec::new();
993
994 for (&field_id, builder) in &self.sparse_vectors {
995 if builder.is_empty() {
996 continue;
997 }
998
999 let field = crate::dsl::Field(field_id);
1000
1001 let quantization = self
1003 .schema
1004 .get_field_entry(field)
1005 .and_then(|e| e.sparse_vector_config.as_ref())
1006 .map(|c| c.weight_quantization)
1007 .unwrap_or(WeightQuantization::Float32);
1008
1009 let max_dim_id = builder.postings.keys().max().copied().unwrap_or(0);
1011
1012 let mut dim_bytes: FxHashMap<u32, Vec<u8>> = FxHashMap::default();
1014
1015 for (&dim_id, postings) in &builder.postings {
1016 let mut sorted_postings = postings.clone();
1018 sorted_postings.sort_by_key(|(doc_id, _)| *doc_id);
1019
1020 let block_list =
1022 BlockSparsePostingList::from_postings(&sorted_postings, quantization)
1023 .map_err(crate::Error::Io)?;
1024
1025 let mut bytes = Vec::new();
1027 block_list.serialize(&mut bytes).map_err(crate::Error::Io)?;
1028
1029 dim_bytes.insert(dim_id, bytes);
1030 }
1031
1032 field_data.push((field_id, quantization, max_dim_id + 1, dim_bytes));
1033 }
1034
1035 if field_data.is_empty() {
1036 return Ok(Vec::new());
1037 }
1038
1039 field_data.sort_by_key(|(id, _, _, _)| *id);
1041
1042 let mut header_size = 4u64;
1046 for (_, _, max_dim_id, _) in &field_data {
1047 header_size += 4 + 1 + 4; header_size += (*max_dim_id as u64) * 12; }
1050
1051 let mut output = Vec::new();
1053
1054 output.write_u32::<LittleEndian>(field_data.len() as u32)?;
1056
1057 let mut current_offset = header_size;
1059
1060 let mut all_data: Vec<u8> = Vec::new();
1062 let mut field_tables: Vec<Vec<(u64, u32)>> = Vec::new();
1063
1064 for (_, _, max_dim_id, dim_bytes) in &field_data {
1065 let mut table: Vec<(u64, u32)> = vec![(0, 0); *max_dim_id as usize];
1066
1067 for dim_id in 0..*max_dim_id {
1069 if let Some(bytes) = dim_bytes.get(&dim_id) {
1070 table[dim_id as usize] = (current_offset, bytes.len() as u32);
1071 current_offset += bytes.len() as u64;
1072 all_data.extend_from_slice(bytes);
1073 }
1074 }
1076
1077 field_tables.push(table);
1078 }
1079
1080 for (i, (field_id, quantization, max_dim_id, _)) in field_data.iter().enumerate() {
1082 output.write_u32::<LittleEndian>(*field_id)?;
1083 output.write_u8(*quantization as u8)?;
1084 output.write_u32::<LittleEndian>(*max_dim_id)?;
1085
1086 for &(offset, length) in &field_tables[i] {
1088 output.write_u64::<LittleEndian>(offset)?;
1089 output.write_u32::<LittleEndian>(length)?;
1090 }
1091 }
1092
1093 output.extend_from_slice(&all_data);
1095
1096 Ok(output)
1097 }
1098
1099 #[allow(clippy::type_complexity)]
1107 fn build_positions_file(&self) -> Result<(Vec<u8>, FxHashMap<Vec<u8>, (u64, u32)>)> {
1108 use crate::structures::PositionPostingList;
1109
1110 let mut position_offsets: FxHashMap<Vec<u8>, (u64, u32)> = FxHashMap::default();
1111
1112 if self.position_index.is_empty() {
1113 return Ok((Vec::new(), position_offsets));
1114 }
1115
1116 let mut entries: Vec<(Vec<u8>, &PositionPostingListBuilder)> = self
1118 .position_index
1119 .iter()
1120 .map(|(term_key, pos_list)| {
1121 let term_str = self.term_interner.resolve(&term_key.term);
1122 let mut key = Vec::with_capacity(4 + term_str.len());
1123 key.extend_from_slice(&term_key.field.to_le_bytes());
1124 key.extend_from_slice(term_str.as_bytes());
1125 (key, pos_list)
1126 })
1127 .collect();
1128
1129 entries.sort_by(|a, b| a.0.cmp(&b.0));
1130
1131 let mut output = Vec::new();
1133
1134 for (key, pos_builder) in entries {
1135 let mut pos_list = PositionPostingList::with_capacity(pos_builder.postings.len());
1137 for (doc_id, positions) in &pos_builder.postings {
1138 pos_list.push(*doc_id, positions.clone());
1139 }
1140
1141 let offset = output.len() as u64;
1143 pos_list.serialize(&mut output).map_err(crate::Error::Io)?;
1144 let len = (output.len() as u64 - offset) as u32;
1145
1146 position_offsets.insert(key, (offset, len));
1147 }
1148
1149 Ok((output, position_offsets))
1150 }
1151
1152 fn build_postings(
1157 &mut self,
1158 position_offsets: &FxHashMap<Vec<u8>, (u64, u32)>,
1159 ) -> Result<(Vec<u8>, Vec<u8>)> {
1160 let mut term_entries: Vec<(Vec<u8>, &PostingListBuilder)> = self
1163 .inverted_index
1164 .iter()
1165 .map(|(term_key, posting_list)| {
1166 let term_str = self.term_interner.resolve(&term_key.term);
1167 let mut key = Vec::with_capacity(4 + term_str.len());
1168 key.extend_from_slice(&term_key.field.to_le_bytes());
1169 key.extend_from_slice(term_str.as_bytes());
1170 (key, posting_list)
1171 })
1172 .collect();
1173
1174 term_entries.par_sort_unstable_by(|a, b| a.0.cmp(&b.0));
1176
1177 let serialized: Vec<(Vec<u8>, SerializedPosting)> = term_entries
1180 .into_par_iter()
1181 .map(|(key, posting_builder)| {
1182 let mut full_postings = PostingList::with_capacity(posting_builder.len());
1184 for p in &posting_builder.postings {
1185 full_postings.push(p.doc_id, p.term_freq as u32);
1186 }
1187
1188 let doc_ids: Vec<u32> = full_postings.iter().map(|p| p.doc_id).collect();
1190 let term_freqs: Vec<u32> = full_postings.iter().map(|p| p.term_freq).collect();
1191
1192 let has_positions = position_offsets.contains_key(&key);
1194 let result = if !has_positions
1195 && let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs)
1196 {
1197 SerializedPosting::Inline(inline)
1198 } else {
1199 let mut posting_bytes = Vec::new();
1201 let block_list =
1202 crate::structures::BlockPostingList::from_posting_list(&full_postings)
1203 .expect("BlockPostingList creation failed");
1204 block_list
1205 .serialize(&mut posting_bytes)
1206 .expect("BlockPostingList serialization failed");
1207 SerializedPosting::External {
1208 bytes: posting_bytes,
1209 doc_count: full_postings.doc_count(),
1210 }
1211 };
1212
1213 (key, result)
1214 })
1215 .collect();
1216
1217 let mut term_dict = Vec::new();
1219 let mut postings = Vec::new();
1220 let mut writer = SSTableWriter::<TermInfo>::new(&mut term_dict);
1221
1222 for (key, serialized_posting) in serialized {
1223 let term_info = match serialized_posting {
1224 SerializedPosting::Inline(info) => info,
1225 SerializedPosting::External { bytes, doc_count } => {
1226 let posting_offset = postings.len() as u64;
1227 let posting_len = bytes.len() as u32;
1228 postings.extend_from_slice(&bytes);
1229
1230 if let Some(&(pos_offset, pos_len)) = position_offsets.get(&key) {
1232 TermInfo::external_with_positions(
1233 posting_offset,
1234 posting_len,
1235 doc_count,
1236 pos_offset,
1237 pos_len,
1238 )
1239 } else {
1240 TermInfo::external(posting_offset, posting_len, doc_count)
1241 }
1242 }
1243 };
1244
1245 writer.insert(&key, &term_info)?;
1246 }
1247
1248 writer.finish()?;
1249 Ok((term_dict, postings))
1250 }
1251
1252 fn build_store_parallel(
1256 store_path: &PathBuf,
1257 schema: &Schema,
1258 num_compression_threads: usize,
1259 compression_level: CompressionLevel,
1260 ) -> Result<Vec<u8>> {
1261 use super::store::EagerParallelStoreWriter;
1262
1263 let file = File::open(store_path)?;
1264 let mmap = unsafe { memmap2::Mmap::map(&file)? };
1265
1266 let mut doc_ranges: Vec<(usize, usize)> = Vec::new();
1268 let mut offset = 0usize;
1269 while offset + 4 <= mmap.len() {
1270 let doc_len = u32::from_le_bytes([
1271 mmap[offset],
1272 mmap[offset + 1],
1273 mmap[offset + 2],
1274 mmap[offset + 3],
1275 ]) as usize;
1276 offset += 4;
1277
1278 if offset + doc_len > mmap.len() {
1279 break;
1280 }
1281
1282 doc_ranges.push((offset, doc_len));
1283 offset += doc_len;
1284 }
1285
1286 let docs: Vec<Document> = doc_ranges
1288 .into_par_iter()
1289 .filter_map(|(start, len)| {
1290 let doc_bytes = &mmap[start..start + len];
1291 super::store::deserialize_document(doc_bytes, schema).ok()
1292 })
1293 .collect();
1294
1295 let mut store_data = Vec::new();
1297 let mut store_writer = EagerParallelStoreWriter::with_compression_level(
1298 &mut store_data,
1299 num_compression_threads,
1300 compression_level,
1301 );
1302
1303 for doc in &docs {
1304 store_writer.store(doc, schema)?;
1305 }
1306
1307 store_writer.finish()?;
1308 Ok(store_data)
1309 }
1310}
1311
1312impl Drop for SegmentBuilder {
1313 fn drop(&mut self) {
1314 let _ = std::fs::remove_file(&self.store_path);
1316 }
1317}