1use std::fs::{File, OpenOptions};
12use std::io::{BufWriter, Write};
13use std::path::PathBuf;
14
15use hashbrown::HashMap;
16use lasso::{Rodeo, Spur};
17use rayon::prelude::*;
18use rustc_hash::FxHashMap;
19
20use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
21use crate::compression::CompressionLevel;
22use crate::directories::{Directory, DirectoryWriter};
23use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
24use crate::structures::{PostingList, SSTableWriter, TermInfo};
25use crate::tokenizer::BoxedTokenizer;
26use crate::{DocId, Result};
27
28const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; #[derive(Clone, Copy, PartialEq, Eq, Hash)]
33struct TermKey {
34 field: u32,
35 term: Spur,
36}
37
38#[derive(Clone, Copy)]
40struct CompactPosting {
41 doc_id: DocId,
42 term_freq: u16,
43}
44
45struct PostingListBuilder {
47 postings: Vec<CompactPosting>,
49}
50
51impl PostingListBuilder {
52 fn new() -> Self {
53 Self {
54 postings: Vec::new(),
55 }
56 }
57
58 #[inline]
60 fn add(&mut self, doc_id: DocId, term_freq: u32) {
61 if let Some(last) = self.postings.last_mut()
63 && last.doc_id == doc_id
64 {
65 last.term_freq = last.term_freq.saturating_add(term_freq as u16);
66 return;
67 }
68 self.postings.push(CompactPosting {
69 doc_id,
70 term_freq: term_freq.min(u16::MAX as u32) as u16,
71 });
72 }
73
74 fn len(&self) -> usize {
75 self.postings.len()
76 }
77}
78
79enum SerializedPosting {
81 Inline(TermInfo),
83 External { bytes: Vec<u8>, doc_count: u32 },
85}
86
87#[derive(Debug, Clone)]
89pub struct SegmentBuilderStats {
90 pub num_docs: u32,
92 pub unique_terms: usize,
94 pub postings_in_memory: usize,
96 pub interned_strings: usize,
98 pub doc_field_lengths_size: usize,
100 pub estimated_memory_bytes: usize,
102 pub memory_breakdown: MemoryBreakdown,
104}
105
106#[derive(Debug, Clone, Default)]
108pub struct MemoryBreakdown {
109 pub postings_bytes: usize,
111 pub index_overhead_bytes: usize,
113 pub interner_bytes: usize,
115 pub field_lengths_bytes: usize,
117 pub dense_vectors_bytes: usize,
119 pub dense_vector_count: usize,
121}
122
123#[derive(Clone)]
125pub struct SegmentBuilderConfig {
126 pub temp_dir: PathBuf,
128 pub compression_level: CompressionLevel,
130 pub num_compression_threads: usize,
132 pub interner_capacity: usize,
134 pub posting_map_capacity: usize,
136}
137
138impl Default for SegmentBuilderConfig {
139 fn default() -> Self {
140 Self {
141 temp_dir: std::env::temp_dir(),
142 compression_level: CompressionLevel(7),
143 num_compression_threads: num_cpus::get(),
144 interner_capacity: 1_000_000,
145 posting_map_capacity: 500_000,
146 }
147 }
148}
149
150pub struct SegmentBuilder {
157 schema: Schema,
158 config: SegmentBuilderConfig,
159 tokenizers: FxHashMap<Field, BoxedTokenizer>,
160
161 term_interner: Rodeo,
163
164 inverted_index: HashMap<TermKey, PostingListBuilder>,
166
167 store_file: BufWriter<File>,
169 store_path: PathBuf,
170
171 next_doc_id: DocId,
173
174 field_stats: FxHashMap<u32, FieldStats>,
176
177 doc_field_lengths: Vec<u32>,
181 num_indexed_fields: usize,
182 field_to_slot: FxHashMap<u32, usize>,
183
184 local_tf_buffer: FxHashMap<Spur, u32>,
187
188 token_buffer: String,
190
191 dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
194
195 sparse_vectors: FxHashMap<u32, SparseVectorBuilder>,
198}
199
200struct DenseVectorBuilder {
202 dim: usize,
204 doc_ids: Vec<DocId>,
206 vectors: Vec<f32>,
208}
209
210impl DenseVectorBuilder {
211 fn new(dim: usize) -> Self {
212 Self {
213 dim,
214 doc_ids: Vec::new(),
215 vectors: Vec::new(),
216 }
217 }
218
219 fn add(&mut self, doc_id: DocId, vector: &[f32]) {
220 debug_assert_eq!(vector.len(), self.dim, "Vector dimension mismatch");
221 self.doc_ids.push(doc_id);
222 self.vectors.extend_from_slice(vector);
223 }
224
225 fn len(&self) -> usize {
226 self.doc_ids.len()
227 }
228
229 fn get_vectors(&self) -> Vec<Vec<f32>> {
231 self.doc_ids
232 .iter()
233 .enumerate()
234 .map(|(i, _)| {
235 let start = i * self.dim;
236 self.vectors[start..start + self.dim].to_vec()
237 })
238 .collect()
239 }
240
241 fn get_vectors_trimmed(&self, trim_dim: usize) -> Vec<Vec<f32>> {
243 debug_assert!(trim_dim <= self.dim, "trim_dim must be <= dim");
244 self.doc_ids
245 .iter()
246 .enumerate()
247 .map(|(i, _)| {
248 let start = i * self.dim;
249 self.vectors[start..start + trim_dim].to_vec()
250 })
251 .collect()
252 }
253}
254
255struct SparseVectorBuilder {
260 postings: FxHashMap<u32, Vec<(DocId, f32)>>,
262}
263
264impl SparseVectorBuilder {
265 fn new() -> Self {
266 Self {
267 postings: FxHashMap::default(),
268 }
269 }
270
271 #[inline]
273 fn add(&mut self, dim_id: u32, doc_id: DocId, weight: f32) {
274 self.postings
275 .entry(dim_id)
276 .or_default()
277 .push((doc_id, weight));
278 }
279
280 fn is_empty(&self) -> bool {
281 self.postings.is_empty()
282 }
283}
284
285impl SegmentBuilder {
286 pub fn new(schema: Schema, config: SegmentBuilderConfig) -> Result<Self> {
288 let segment_id = uuid::Uuid::new_v4();
289 let store_path = config
290 .temp_dir
291 .join(format!("hermes_store_{}.tmp", segment_id));
292
293 let store_file = BufWriter::with_capacity(
294 STORE_BUFFER_SIZE,
295 OpenOptions::new()
296 .create(true)
297 .write(true)
298 .truncate(true)
299 .open(&store_path)?,
300 );
301
302 let mut num_indexed_fields = 0;
304 let mut field_to_slot = FxHashMap::default();
305 for (field, entry) in schema.fields() {
306 if entry.indexed && matches!(entry.field_type, FieldType::Text) {
307 field_to_slot.insert(field.0, num_indexed_fields);
308 num_indexed_fields += 1;
309 }
310 }
311
312 Ok(Self {
313 schema,
314 tokenizers: FxHashMap::default(),
315 term_interner: Rodeo::new(),
316 inverted_index: HashMap::with_capacity(config.posting_map_capacity),
317 store_file,
318 store_path,
319 next_doc_id: 0,
320 field_stats: FxHashMap::default(),
321 doc_field_lengths: Vec::new(),
322 num_indexed_fields,
323 field_to_slot,
324 local_tf_buffer: FxHashMap::default(),
325 token_buffer: String::with_capacity(64),
326 config,
327 dense_vectors: FxHashMap::default(),
328 sparse_vectors: FxHashMap::default(),
329 })
330 }
331
332 pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
333 self.tokenizers.insert(field, tokenizer);
334 }
335
336 pub fn num_docs(&self) -> u32 {
337 self.next_doc_id
338 }
339
340 pub fn stats(&self) -> SegmentBuilderStats {
342 use std::mem::size_of;
343
344 let postings_in_memory: usize =
345 self.inverted_index.values().map(|p| p.postings.len()).sum();
346
347 let compact_posting_size = size_of::<CompactPosting>();
350
351 let postings_bytes: usize = self
353 .inverted_index
354 .values()
355 .map(|p| {
356 p.postings.capacity() * compact_posting_size + size_of::<Vec<CompactPosting>>()
357 })
358 .sum();
359
360 let term_key_size = size_of::<TermKey>();
364 let posting_builder_size = size_of::<PostingListBuilder>();
365 let hashmap_entry_overhead = 24; let index_overhead_bytes = self.inverted_index.len()
367 * (term_key_size + posting_builder_size + hashmap_entry_overhead);
368
369 let avg_term_len = 8;
373 let interner_overhead_per_string = size_of::<lasso::Spur>() + 16;
374 let interner_bytes =
375 self.term_interner.len() * (avg_term_len + interner_overhead_per_string);
376
377 let field_lengths_bytes =
379 self.doc_field_lengths.capacity() * size_of::<u32>() + size_of::<Vec<u32>>();
380
381 let mut dense_vectors_bytes: usize = 0;
383 let mut dense_vector_count: usize = 0;
384 for b in self.dense_vectors.values() {
385 dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
387 + b.doc_ids.capacity() * size_of::<DocId>()
388 + size_of::<Vec<f32>>()
389 + size_of::<Vec<DocId>>();
390 dense_vector_count += b.doc_ids.len();
391 }
392
393 let local_tf_buffer_bytes =
395 self.local_tf_buffer.capacity() * (size_of::<lasso::Spur>() + size_of::<u32>() + 16);
396
397 let estimated_memory_bytes = postings_bytes
398 + index_overhead_bytes
399 + interner_bytes
400 + field_lengths_bytes
401 + dense_vectors_bytes
402 + local_tf_buffer_bytes;
403
404 let memory_breakdown = MemoryBreakdown {
405 postings_bytes,
406 index_overhead_bytes,
407 interner_bytes,
408 field_lengths_bytes,
409 dense_vectors_bytes,
410 dense_vector_count,
411 };
412
413 SegmentBuilderStats {
414 num_docs: self.next_doc_id,
415 unique_terms: self.inverted_index.len(),
416 postings_in_memory,
417 interned_strings: self.term_interner.len(),
418 doc_field_lengths_size: self.doc_field_lengths.len(),
419 estimated_memory_bytes,
420 memory_breakdown,
421 }
422 }
423
424 pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
426 let doc_id = self.next_doc_id;
427 self.next_doc_id += 1;
428
429 let base_idx = self.doc_field_lengths.len();
431 self.doc_field_lengths
432 .resize(base_idx + self.num_indexed_fields, 0);
433
434 for (field, value) in doc.field_values() {
435 let entry = self.schema.get_field_entry(*field);
436 if entry.is_none() || !entry.unwrap().indexed {
437 continue;
438 }
439
440 let entry = entry.unwrap();
441 match (&entry.field_type, value) {
442 (FieldType::Text, FieldValue::Text(text)) => {
443 let token_count = self.index_text_field(*field, doc_id, text)?;
444
445 let stats = self.field_stats.entry(field.0).or_default();
447 stats.total_tokens += token_count as u64;
448 stats.doc_count += 1;
449
450 if let Some(&slot) = self.field_to_slot.get(&field.0) {
452 self.doc_field_lengths[base_idx + slot] = token_count;
453 }
454 }
455 (FieldType::U64, FieldValue::U64(v)) => {
456 self.index_numeric_field(*field, doc_id, *v)?;
457 }
458 (FieldType::I64, FieldValue::I64(v)) => {
459 self.index_numeric_field(*field, doc_id, *v as u64)?;
460 }
461 (FieldType::F64, FieldValue::F64(v)) => {
462 self.index_numeric_field(*field, doc_id, v.to_bits())?;
463 }
464 (FieldType::DenseVector, FieldValue::DenseVector(vec)) => {
465 self.index_dense_vector_field(*field, doc_id, vec)?;
466 }
467 (FieldType::SparseVector, FieldValue::SparseVector(entries)) => {
468 self.index_sparse_vector_field(*field, doc_id, entries)?;
469 }
470 _ => {}
471 }
472 }
473
474 self.write_document_to_store(&doc)?;
476
477 Ok(doc_id)
478 }
479
480 fn index_text_field(&mut self, field: Field, doc_id: DocId, text: &str) -> Result<u32> {
488 self.local_tf_buffer.clear();
491
492 let mut token_count = 0u32;
493
494 for word in text.split_whitespace() {
496 self.token_buffer.clear();
498 for c in word.chars() {
499 if c.is_alphanumeric() {
500 for lc in c.to_lowercase() {
501 self.token_buffer.push(lc);
502 }
503 }
504 }
505
506 if self.token_buffer.is_empty() {
507 continue;
508 }
509
510 token_count += 1;
511
512 let term_spur = self.term_interner.get_or_intern(&self.token_buffer);
514 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
515 }
516
517 let field_id = field.0;
520
521 for (&term_spur, &tf) in &self.local_tf_buffer {
522 let term_key = TermKey {
523 field: field_id,
524 term: term_spur,
525 };
526
527 let posting = self
528 .inverted_index
529 .entry(term_key)
530 .or_insert_with(PostingListBuilder::new);
531 posting.add(doc_id, tf);
532 }
533
534 Ok(token_count)
535 }
536
537 fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
538 let term_str = format!("__num_{}", value);
540 let term_spur = self.term_interner.get_or_intern(&term_str);
541
542 let term_key = TermKey {
543 field: field.0,
544 term: term_spur,
545 };
546
547 let posting = self
548 .inverted_index
549 .entry(term_key)
550 .or_insert_with(PostingListBuilder::new);
551 posting.add(doc_id, 1);
552
553 Ok(())
554 }
555
556 fn index_dense_vector_field(
558 &mut self,
559 field: Field,
560 doc_id: DocId,
561 vector: &[f32],
562 ) -> Result<()> {
563 let dim = vector.len();
564
565 let builder = self
566 .dense_vectors
567 .entry(field.0)
568 .or_insert_with(|| DenseVectorBuilder::new(dim));
569
570 if builder.dim != dim && builder.len() > 0 {
572 return Err(crate::Error::Schema(format!(
573 "Dense vector dimension mismatch: expected {}, got {}",
574 builder.dim, dim
575 )));
576 }
577
578 builder.add(doc_id, vector);
579 Ok(())
580 }
581
582 fn index_sparse_vector_field(
589 &mut self,
590 field: Field,
591 doc_id: DocId,
592 entries: &[(u32, f32)],
593 ) -> Result<()> {
594 let weight_threshold = self
596 .schema
597 .get_field_entry(field)
598 .and_then(|entry| entry.sparse_vector_config.as_ref())
599 .map(|config| config.weight_threshold)
600 .unwrap_or(0.0);
601
602 let builder = self
603 .sparse_vectors
604 .entry(field.0)
605 .or_insert_with(SparseVectorBuilder::new);
606
607 for &(dim_id, weight) in entries {
608 if weight.abs() < weight_threshold {
610 continue;
611 }
612
613 builder.add(dim_id, doc_id, weight);
614 }
615
616 Ok(())
617 }
618
619 fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
621 use byteorder::{LittleEndian, WriteBytesExt};
622
623 let doc_bytes = super::store::serialize_document(doc, &self.schema)?;
624
625 self.store_file
626 .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
627 self.store_file.write_all(&doc_bytes)?;
628
629 Ok(())
630 }
631
632 pub async fn build<D: Directory + DirectoryWriter>(
634 mut self,
635 dir: &D,
636 segment_id: SegmentId,
637 ) -> Result<SegmentMeta> {
638 self.store_file.flush()?;
640
641 let files = SegmentFiles::new(segment_id.0);
642
643 let store_path = self.store_path.clone();
645 let schema = self.schema.clone();
646 let num_compression_threads = self.config.num_compression_threads;
647 let compression_level = self.config.compression_level;
648
649 let (postings_result, store_result) = rayon::join(
651 || self.build_postings(),
652 || {
653 Self::build_store_parallel(
654 &store_path,
655 &schema,
656 num_compression_threads,
657 compression_level,
658 )
659 },
660 );
661
662 let (term_dict_data, postings_data) = postings_result?;
663 let store_data = store_result?;
664
665 dir.write(&files.term_dict, &term_dict_data).await?;
667 dir.write(&files.postings, &postings_data).await?;
668 dir.write(&files.store, &store_data).await?;
669
670 if !self.dense_vectors.is_empty() {
672 let vectors_data = self.build_vectors_file()?;
673 if !vectors_data.is_empty() {
674 dir.write(&files.vectors, &vectors_data).await?;
675 }
676 }
677
678 if !self.sparse_vectors.is_empty() {
680 let sparse_data = self.build_sparse_file()?;
681 if !sparse_data.is_empty() {
682 dir.write(&files.sparse, &sparse_data).await?;
683 }
684 }
685
686 let meta = SegmentMeta {
687 id: segment_id.0,
688 num_docs: self.next_doc_id,
689 field_stats: self.field_stats.clone(),
690 };
691
692 dir.write(&files.meta, &meta.serialize()?).await?;
693
694 let _ = std::fs::remove_file(&self.store_path);
696
697 Ok(meta)
698 }
699
700 fn build_vectors_file(&self) -> Result<Vec<u8>> {
707 use crate::dsl::VectorIndexType;
708 use byteorder::{LittleEndian, WriteBytesExt};
709
710 let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
712
713 for (&field_id, builder) in &self.dense_vectors {
714 if builder.len() == 0 {
715 continue;
716 }
717
718 let field = crate::dsl::Field(field_id);
719
720 let dense_config = self
722 .schema
723 .get_field_entry(field)
724 .and_then(|e| e.dense_vector_config.as_ref());
725
726 let index_dim = dense_config.map(|c| c.index_dim()).unwrap_or(builder.dim);
728 let vectors = if index_dim < builder.dim {
729 builder.get_vectors_trimmed(index_dim)
731 } else {
732 builder.get_vectors()
733 };
734
735 let (index_type, index_bytes) = match dense_config.map(|c| c.index_type) {
736 Some(VectorIndexType::ScaNN) => {
737 let config = dense_config.unwrap();
739 let centroids_path =
740 config.coarse_centroids_path.as_ref().ok_or_else(|| {
741 crate::Error::Schema("ScaNN requires coarse_centroids_path".into())
742 })?;
743 let codebook_path = config.pq_codebook_path.as_ref().ok_or_else(|| {
744 crate::Error::Schema("ScaNN requires pq_codebook_path".into())
745 })?;
746
747 let coarse_centroids = crate::structures::CoarseCentroids::load(
748 std::path::Path::new(centroids_path),
749 )
750 .map_err(crate::Error::Io)?;
751
752 let pq_codebook =
753 crate::structures::PQCodebook::load(std::path::Path::new(codebook_path))
754 .map_err(crate::Error::Io)?;
755
756 let doc_ids: Vec<u32> = builder.doc_ids.clone();
757 let ivfpq_config = crate::structures::IVFPQConfig::new(index_dim)
758 .with_store_raw(config.store_raw);
759
760 let ivfpq_index = crate::structures::IVFPQIndex::build(
761 ivfpq_config,
762 &coarse_centroids,
763 &pq_codebook,
764 &vectors,
765 Some(doc_ids.as_slice()),
766 );
767
768 let bytes = ivfpq_index
770 .to_bytes()
771 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
772 (2u8, bytes) }
774 Some(VectorIndexType::IvfRaBitQ) => {
775 let config = dense_config.unwrap();
777 let centroids_path =
778 config.coarse_centroids_path.as_ref().ok_or_else(|| {
779 crate::Error::Schema("IVF-RaBitQ requires coarse_centroids_path".into())
780 })?;
781
782 match crate::structures::CoarseCentroids::load(std::path::Path::new(
783 centroids_path,
784 )) {
785 Ok(coarse_centroids) => {
786 let ivf_cfg = crate::structures::IVFRaBitQConfig::new(index_dim)
787 .with_store_raw(config.store_raw);
788 let rabitq_codebook = crate::structures::RaBitQCodebook::new(
789 crate::structures::RaBitQConfig::new(index_dim),
790 );
791 let doc_ids: Vec<u32> = builder.doc_ids.clone();
792 let ivf_index = crate::structures::IVFRaBitQIndex::build(
793 ivf_cfg,
794 &coarse_centroids,
795 &rabitq_codebook,
796 &vectors,
797 Some(doc_ids.as_slice()),
798 );
799 let bytes = ivf_index
800 .to_bytes()
801 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
802 (1u8, bytes) }
804 Err(e) => {
805 log::warn!("Failed to load centroids: {}, falling back to RaBitQ", e);
806 let cfg = crate::structures::RaBitQConfig::new(index_dim);
807 let idx = crate::structures::RaBitQIndex::build(cfg, &vectors, true);
808 let bytes = serde_json::to_vec(&idx)
809 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
810 (0u8, bytes) }
812 }
813 }
814 _ => {
815 let store_raw = dense_config.map(|c| c.store_raw).unwrap_or(true);
817 let cfg = crate::structures::RaBitQConfig::new(index_dim);
818 let idx = crate::structures::RaBitQIndex::build(cfg, &vectors, store_raw);
819 let bytes = serde_json::to_vec(&idx)
820 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
821 (0u8, bytes) }
823 };
824
825 field_indexes.push((field_id, index_type, index_bytes));
826 }
827
828 if field_indexes.is_empty() {
829 return Ok(Vec::new());
830 }
831
832 field_indexes.sort_by_key(|(id, _, _)| *id);
834
835 let header_size = 4 + field_indexes.len() * (4 + 1 + 8 + 8);
837
838 let mut output = Vec::new();
840
841 output.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
843
844 let mut current_offset = header_size as u64;
846 for (field_id, index_type, data) in &field_indexes {
847 output.write_u32::<LittleEndian>(*field_id)?;
848 output.write_u8(*index_type)?;
849 output.write_u64::<LittleEndian>(current_offset)?;
850 output.write_u64::<LittleEndian>(data.len() as u64)?;
851 current_offset += data.len() as u64;
852 }
853
854 for (_, _, data) in field_indexes {
856 output.extend_from_slice(&data);
857 }
858
859 Ok(output)
860 }
861
862 fn build_sparse_file(&self) -> Result<Vec<u8>> {
874 use crate::structures::{BlockSparsePostingList, WeightQuantization};
875 use byteorder::{LittleEndian, WriteBytesExt};
876
877 if self.sparse_vectors.is_empty() {
878 return Ok(Vec::new());
879 }
880
881 type SparseFieldData = (u32, WeightQuantization, u32, FxHashMap<u32, Vec<u8>>);
883 let mut field_data: Vec<SparseFieldData> = Vec::new();
884
885 for (&field_id, builder) in &self.sparse_vectors {
886 if builder.is_empty() {
887 continue;
888 }
889
890 let field = crate::dsl::Field(field_id);
891
892 let quantization = self
894 .schema
895 .get_field_entry(field)
896 .and_then(|e| e.sparse_vector_config.as_ref())
897 .map(|c| c.weight_quantization)
898 .unwrap_or(WeightQuantization::Float32);
899
900 let max_dim_id = builder.postings.keys().max().copied().unwrap_or(0);
902
903 let mut dim_bytes: FxHashMap<u32, Vec<u8>> = FxHashMap::default();
905
906 for (&dim_id, postings) in &builder.postings {
907 let mut sorted_postings = postings.clone();
909 sorted_postings.sort_by_key(|(doc_id, _)| *doc_id);
910
911 let block_list =
913 BlockSparsePostingList::from_postings(&sorted_postings, quantization)
914 .map_err(crate::Error::Io)?;
915
916 let mut bytes = Vec::new();
918 block_list.serialize(&mut bytes).map_err(crate::Error::Io)?;
919
920 dim_bytes.insert(dim_id, bytes);
921 }
922
923 field_data.push((field_id, quantization, max_dim_id + 1, dim_bytes));
924 }
925
926 if field_data.is_empty() {
927 return Ok(Vec::new());
928 }
929
930 field_data.sort_by_key(|(id, _, _, _)| *id);
932
933 let mut header_size = 4u64;
937 for (_, _, max_dim_id, _) in &field_data {
938 header_size += 4 + 1 + 4; header_size += (*max_dim_id as u64) * 12; }
941
942 let mut output = Vec::new();
944
945 output.write_u32::<LittleEndian>(field_data.len() as u32)?;
947
948 let mut current_offset = header_size;
950
951 let mut all_data: Vec<u8> = Vec::new();
953 let mut field_tables: Vec<Vec<(u64, u32)>> = Vec::new();
954
955 for (_, _, max_dim_id, dim_bytes) in &field_data {
956 let mut table: Vec<(u64, u32)> = vec![(0, 0); *max_dim_id as usize];
957
958 for dim_id in 0..*max_dim_id {
960 if let Some(bytes) = dim_bytes.get(&dim_id) {
961 table[dim_id as usize] = (current_offset, bytes.len() as u32);
962 current_offset += bytes.len() as u64;
963 all_data.extend_from_slice(bytes);
964 }
965 }
967
968 field_tables.push(table);
969 }
970
971 for (i, (field_id, quantization, max_dim_id, _)) in field_data.iter().enumerate() {
973 output.write_u32::<LittleEndian>(*field_id)?;
974 output.write_u8(*quantization as u8)?;
975 output.write_u32::<LittleEndian>(*max_dim_id)?;
976
977 for &(offset, length) in &field_tables[i] {
979 output.write_u64::<LittleEndian>(offset)?;
980 output.write_u32::<LittleEndian>(length)?;
981 }
982 }
983
984 output.extend_from_slice(&all_data);
986
987 Ok(output)
988 }
989
990 fn build_postings(&mut self) -> Result<(Vec<u8>, Vec<u8>)> {
994 let mut term_entries: Vec<(Vec<u8>, &PostingListBuilder)> = self
997 .inverted_index
998 .iter()
999 .map(|(term_key, posting_list)| {
1000 let term_str = self.term_interner.resolve(&term_key.term);
1001 let mut key = Vec::with_capacity(4 + term_str.len());
1002 key.extend_from_slice(&term_key.field.to_le_bytes());
1003 key.extend_from_slice(term_str.as_bytes());
1004 (key, posting_list)
1005 })
1006 .collect();
1007
1008 term_entries.par_sort_unstable_by(|a, b| a.0.cmp(&b.0));
1010
1011 let serialized: Vec<(Vec<u8>, SerializedPosting)> = term_entries
1014 .into_par_iter()
1015 .map(|(key, posting_builder)| {
1016 let mut full_postings = PostingList::with_capacity(posting_builder.len());
1018 for p in &posting_builder.postings {
1019 full_postings.push(p.doc_id, p.term_freq as u32);
1020 }
1021
1022 let doc_ids: Vec<u32> = full_postings.iter().map(|p| p.doc_id).collect();
1024 let term_freqs: Vec<u32> = full_postings.iter().map(|p| p.term_freq).collect();
1025
1026 let result = if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
1027 SerializedPosting::Inline(inline)
1028 } else {
1029 let mut posting_bytes = Vec::new();
1031 let block_list =
1032 crate::structures::BlockPostingList::from_posting_list(&full_postings)
1033 .expect("BlockPostingList creation failed");
1034 block_list
1035 .serialize(&mut posting_bytes)
1036 .expect("BlockPostingList serialization failed");
1037 SerializedPosting::External {
1038 bytes: posting_bytes,
1039 doc_count: full_postings.doc_count(),
1040 }
1041 };
1042
1043 (key, result)
1044 })
1045 .collect();
1046
1047 let mut term_dict = Vec::new();
1049 let mut postings = Vec::new();
1050 let mut writer = SSTableWriter::<TermInfo>::new(&mut term_dict);
1051
1052 for (key, serialized_posting) in serialized {
1053 let term_info = match serialized_posting {
1054 SerializedPosting::Inline(info) => info,
1055 SerializedPosting::External { bytes, doc_count } => {
1056 let posting_offset = postings.len() as u64;
1057 let posting_len = bytes.len() as u32;
1058 postings.extend_from_slice(&bytes);
1059 TermInfo::external(posting_offset, posting_len, doc_count)
1060 }
1061 };
1062
1063 writer.insert(&key, &term_info)?;
1064 }
1065
1066 writer.finish()?;
1067 Ok((term_dict, postings))
1068 }
1069
1070 fn build_store_parallel(
1074 store_path: &PathBuf,
1075 schema: &Schema,
1076 num_compression_threads: usize,
1077 compression_level: CompressionLevel,
1078 ) -> Result<Vec<u8>> {
1079 use super::store::EagerParallelStoreWriter;
1080
1081 let file = File::open(store_path)?;
1082 let mmap = unsafe { memmap2::Mmap::map(&file)? };
1083
1084 let mut doc_ranges: Vec<(usize, usize)> = Vec::new();
1086 let mut offset = 0usize;
1087 while offset + 4 <= mmap.len() {
1088 let doc_len = u32::from_le_bytes([
1089 mmap[offset],
1090 mmap[offset + 1],
1091 mmap[offset + 2],
1092 mmap[offset + 3],
1093 ]) as usize;
1094 offset += 4;
1095
1096 if offset + doc_len > mmap.len() {
1097 break;
1098 }
1099
1100 doc_ranges.push((offset, doc_len));
1101 offset += doc_len;
1102 }
1103
1104 let docs: Vec<Document> = doc_ranges
1106 .into_par_iter()
1107 .filter_map(|(start, len)| {
1108 let doc_bytes = &mmap[start..start + len];
1109 super::store::deserialize_document(doc_bytes, schema).ok()
1110 })
1111 .collect();
1112
1113 let mut store_data = Vec::new();
1115 let mut store_writer = EagerParallelStoreWriter::with_compression_level(
1116 &mut store_data,
1117 num_compression_threads,
1118 compression_level,
1119 );
1120
1121 for doc in &docs {
1122 store_writer.store(doc, schema)?;
1123 }
1124
1125 store_writer.finish()?;
1126 Ok(store_data)
1127 }
1128}
1129
1130impl Drop for SegmentBuilder {
1131 fn drop(&mut self) {
1132 let _ = std::fs::remove_file(&self.store_path);
1134 }
1135}