hermes_core/segment/
builder.rs

1//! Streaming segment builder with optimized memory usage
2//!
3//! Key optimizations:
4//! - **String interning**: Terms are interned using `lasso` to avoid repeated allocations
5//! - **hashbrown HashMap**: O(1) average insertion instead of BTreeMap's O(log n)
6//! - **Streaming document store**: Documents written to disk immediately
7//! - **Incremental posting flush**: Large posting lists flushed to temp file
8//! - **Memory-mapped intermediate files**: Reduces memory pressure
9//! - **Arena allocation**: Batch allocations for reduced fragmentation
10
11use std::fs::{File, OpenOptions};
12use std::io::{BufWriter, Write};
13use std::path::PathBuf;
14
15use hashbrown::HashMap;
16use lasso::{Rodeo, Spur};
17use rayon::prelude::*;
18use rustc_hash::FxHashMap;
19
20use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
21use crate::compression::CompressionLevel;
22use crate::directories::{Directory, DirectoryWriter};
23use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
24use crate::structures::{PostingList, SSTableWriter, TermInfo};
25use crate::tokenizer::BoxedTokenizer;
26use crate::{DocId, Result};
27
28/// Size of the document store buffer before writing to disk
29const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; // 16MB
30
31/// Interned term key combining field and term
32#[derive(Clone, Copy, PartialEq, Eq, Hash)]
33struct TermKey {
34    field: u32,
35    term: Spur,
36}
37
38/// Compact posting entry for in-memory storage
39#[derive(Clone, Copy)]
40struct CompactPosting {
41    doc_id: DocId,
42    term_freq: u16,
43}
44
45/// In-memory posting list for a term
46struct PostingListBuilder {
47    /// In-memory postings
48    postings: Vec<CompactPosting>,
49}
50
51impl PostingListBuilder {
52    fn new() -> Self {
53        Self {
54            postings: Vec::new(),
55        }
56    }
57
58    /// Add a posting, merging if same doc_id as last
59    #[inline]
60    fn add(&mut self, doc_id: DocId, term_freq: u32) {
61        // Check if we can merge with the last posting
62        if let Some(last) = self.postings.last_mut()
63            && last.doc_id == doc_id
64        {
65            last.term_freq = last.term_freq.saturating_add(term_freq as u16);
66            return;
67        }
68        self.postings.push(CompactPosting {
69            doc_id,
70            term_freq: term_freq.min(u16::MAX as u32) as u16,
71        });
72    }
73
74    fn len(&self) -> usize {
75        self.postings.len()
76    }
77}
78
79/// Intermediate result for parallel posting serialization
80enum SerializedPosting {
81    /// Inline posting (small enough to fit in TermInfo)
82    Inline(TermInfo),
83    /// External posting with serialized bytes
84    External { bytes: Vec<u8>, doc_count: u32 },
85}
86
87/// Statistics for debugging segment builder performance
88#[derive(Debug, Clone)]
89pub struct SegmentBuilderStats {
90    /// Number of documents indexed
91    pub num_docs: u32,
92    /// Number of unique terms in the inverted index
93    pub unique_terms: usize,
94    /// Total postings in memory (across all terms)
95    pub postings_in_memory: usize,
96    /// Number of interned strings
97    pub interned_strings: usize,
98    /// Size of doc_field_lengths vector
99    pub doc_field_lengths_size: usize,
100    /// Estimated total memory usage in bytes
101    pub estimated_memory_bytes: usize,
102    /// Memory breakdown by component
103    pub memory_breakdown: MemoryBreakdown,
104}
105
106/// Detailed memory breakdown by component
107#[derive(Debug, Clone, Default)]
108pub struct MemoryBreakdown {
109    /// Postings memory (CompactPosting structs)
110    pub postings_bytes: usize,
111    /// Inverted index HashMap overhead
112    pub index_overhead_bytes: usize,
113    /// Term interner memory
114    pub interner_bytes: usize,
115    /// Document field lengths
116    pub field_lengths_bytes: usize,
117    /// Dense vector storage
118    pub dense_vectors_bytes: usize,
119    /// Number of dense vectors
120    pub dense_vector_count: usize,
121}
122
123/// Configuration for segment builder
124#[derive(Clone)]
125pub struct SegmentBuilderConfig {
126    /// Directory for temporary spill files
127    pub temp_dir: PathBuf,
128    /// Compression level for document store
129    pub compression_level: CompressionLevel,
130    /// Number of threads for parallel compression
131    pub num_compression_threads: usize,
132    /// Initial capacity for term interner
133    pub interner_capacity: usize,
134    /// Initial capacity for posting lists hashmap
135    pub posting_map_capacity: usize,
136}
137
138impl Default for SegmentBuilderConfig {
139    fn default() -> Self {
140        Self {
141            temp_dir: std::env::temp_dir(),
142            compression_level: CompressionLevel(7),
143            num_compression_threads: num_cpus::get(),
144            interner_capacity: 1_000_000,
145            posting_map_capacity: 500_000,
146        }
147    }
148}
149
150/// Segment builder with optimized memory usage
151///
152/// Features:
153/// - Streams documents to disk immediately (no in-memory document storage)
154/// - Uses string interning for terms (reduced allocations)
155/// - Uses hashbrown HashMap (faster than BTreeMap)
156pub struct SegmentBuilder {
157    schema: Schema,
158    config: SegmentBuilderConfig,
159    tokenizers: FxHashMap<Field, BoxedTokenizer>,
160
161    /// String interner for terms - O(1) lookup and deduplication
162    term_interner: Rodeo,
163
164    /// Inverted index: term key -> posting list
165    inverted_index: HashMap<TermKey, PostingListBuilder>,
166
167    /// Streaming document store writer
168    store_file: BufWriter<File>,
169    store_path: PathBuf,
170
171    /// Document count
172    next_doc_id: DocId,
173
174    /// Per-field statistics for BM25F
175    field_stats: FxHashMap<u32, FieldStats>,
176
177    /// Per-document field lengths stored compactly
178    /// Uses a flat Vec instead of Vec<HashMap> for better cache locality
179    /// Layout: [doc0_field0_len, doc0_field1_len, ..., doc1_field0_len, ...]
180    doc_field_lengths: Vec<u32>,
181    num_indexed_fields: usize,
182    field_to_slot: FxHashMap<u32, usize>,
183
184    /// Reusable buffer for per-document term frequency aggregation
185    /// Avoids allocating a new hashmap for each document
186    local_tf_buffer: FxHashMap<Spur, u32>,
187
188    /// Reusable buffer for tokenization to avoid per-token String allocations
189    token_buffer: String,
190
191    /// Dense vector storage per field: field -> (doc_ids, vectors)
192    /// Vectors are stored as flat f32 arrays for efficient RaBitQ indexing
193    dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
194
195    /// Sparse vector storage per field: field -> SparseVectorBuilder
196    /// Uses proper BlockSparsePostingList with configurable quantization
197    sparse_vectors: FxHashMap<u32, SparseVectorBuilder>,
198}
199
200/// Builder for dense vector index using RaBitQ
201struct DenseVectorBuilder {
202    /// Dimension of vectors
203    dim: usize,
204    /// Document IDs with vectors
205    doc_ids: Vec<DocId>,
206    /// Flat vector storage (doc_ids.len() * dim floats)
207    vectors: Vec<f32>,
208}
209
210impl DenseVectorBuilder {
211    fn new(dim: usize) -> Self {
212        Self {
213            dim,
214            doc_ids: Vec::new(),
215            vectors: Vec::new(),
216        }
217    }
218
219    fn add(&mut self, doc_id: DocId, vector: &[f32]) {
220        debug_assert_eq!(vector.len(), self.dim, "Vector dimension mismatch");
221        self.doc_ids.push(doc_id);
222        self.vectors.extend_from_slice(vector);
223    }
224
225    fn len(&self) -> usize {
226        self.doc_ids.len()
227    }
228
229    /// Get all vectors as Vec<Vec<f32>> for RaBitQ indexing
230    fn get_vectors(&self) -> Vec<Vec<f32>> {
231        self.doc_ids
232            .iter()
233            .enumerate()
234            .map(|(i, _)| {
235                let start = i * self.dim;
236                self.vectors[start..start + self.dim].to_vec()
237            })
238            .collect()
239    }
240
241    /// Get vectors trimmed to specified dimension for matryoshka/MRL indexing
242    fn get_vectors_trimmed(&self, trim_dim: usize) -> Vec<Vec<f32>> {
243        debug_assert!(trim_dim <= self.dim, "trim_dim must be <= dim");
244        self.doc_ids
245            .iter()
246            .enumerate()
247            .map(|(i, _)| {
248                let start = i * self.dim;
249                self.vectors[start..start + trim_dim].to_vec()
250            })
251            .collect()
252    }
253}
254
255/// Builder for sparse vector index using BlockSparsePostingList
256///
257/// Collects (doc_id, weight) postings per dimension, then builds
258/// BlockSparsePostingList with proper quantization during commit.
259struct SparseVectorBuilder {
260    /// Postings per dimension: dim_id -> Vec<(doc_id, weight)>
261    postings: FxHashMap<u32, Vec<(DocId, f32)>>,
262}
263
264impl SparseVectorBuilder {
265    fn new() -> Self {
266        Self {
267            postings: FxHashMap::default(),
268        }
269    }
270
271    /// Add a sparse vector entry
272    #[inline]
273    fn add(&mut self, dim_id: u32, doc_id: DocId, weight: f32) {
274        self.postings
275            .entry(dim_id)
276            .or_default()
277            .push((doc_id, weight));
278    }
279
280    fn is_empty(&self) -> bool {
281        self.postings.is_empty()
282    }
283}
284
285impl SegmentBuilder {
286    /// Create a new segment builder
287    pub fn new(schema: Schema, config: SegmentBuilderConfig) -> Result<Self> {
288        let segment_id = uuid::Uuid::new_v4();
289        let store_path = config
290            .temp_dir
291            .join(format!("hermes_store_{}.tmp", segment_id));
292
293        let store_file = BufWriter::with_capacity(
294            STORE_BUFFER_SIZE,
295            OpenOptions::new()
296                .create(true)
297                .write(true)
298                .truncate(true)
299                .open(&store_path)?,
300        );
301
302        // Count indexed fields for compact field length storage
303        let mut num_indexed_fields = 0;
304        let mut field_to_slot = FxHashMap::default();
305        for (field, entry) in schema.fields() {
306            if entry.indexed && matches!(entry.field_type, FieldType::Text) {
307                field_to_slot.insert(field.0, num_indexed_fields);
308                num_indexed_fields += 1;
309            }
310        }
311
312        Ok(Self {
313            schema,
314            tokenizers: FxHashMap::default(),
315            term_interner: Rodeo::new(),
316            inverted_index: HashMap::with_capacity(config.posting_map_capacity),
317            store_file,
318            store_path,
319            next_doc_id: 0,
320            field_stats: FxHashMap::default(),
321            doc_field_lengths: Vec::new(),
322            num_indexed_fields,
323            field_to_slot,
324            local_tf_buffer: FxHashMap::default(),
325            token_buffer: String::with_capacity(64),
326            config,
327            dense_vectors: FxHashMap::default(),
328            sparse_vectors: FxHashMap::default(),
329        })
330    }
331
332    pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
333        self.tokenizers.insert(field, tokenizer);
334    }
335
336    pub fn num_docs(&self) -> u32 {
337        self.next_doc_id
338    }
339
340    /// Get current statistics for debugging performance
341    pub fn stats(&self) -> SegmentBuilderStats {
342        use std::mem::size_of;
343
344        let postings_in_memory: usize =
345            self.inverted_index.values().map(|p| p.postings.len()).sum();
346
347        // Precise memory calculation using actual struct sizes
348        // CompactPosting: doc_id (u32) + term_freq (u16) = 6 bytes, but may have padding
349        let compact_posting_size = size_of::<CompactPosting>();
350
351        // Postings: actual Vec capacity * element size + Vec overhead (24 bytes on 64-bit)
352        let postings_bytes: usize = self
353            .inverted_index
354            .values()
355            .map(|p| {
356                p.postings.capacity() * compact_posting_size + size_of::<Vec<CompactPosting>>()
357            })
358            .sum();
359
360        // Inverted index: HashMap overhead per entry
361        // Each entry: TermKey (field u32 + Spur 4 bytes = 8 bytes) + PostingListBuilder + HashMap bucket overhead
362        // HashMap typically uses ~1.5x capacity, each bucket ~16-24 bytes
363        let term_key_size = size_of::<TermKey>();
364        let posting_builder_size = size_of::<PostingListBuilder>();
365        let hashmap_entry_overhead = 24; // bucket pointer + metadata
366        let index_overhead_bytes = self.inverted_index.len()
367            * (term_key_size + posting_builder_size + hashmap_entry_overhead);
368
369        // Term interner: Rodeo stores strings + metadata
370        // Each interned string: actual string bytes + Spur (4 bytes) + internal overhead (~16 bytes)
371        // We can't get exact string lengths, so estimate average term length of 8 bytes
372        let avg_term_len = 8;
373        let interner_overhead_per_string = size_of::<lasso::Spur>() + 16;
374        let interner_bytes =
375            self.term_interner.len() * (avg_term_len + interner_overhead_per_string);
376
377        // Doc field lengths: Vec<u32> with capacity
378        let field_lengths_bytes =
379            self.doc_field_lengths.capacity() * size_of::<u32>() + size_of::<Vec<u32>>();
380
381        // Dense vectors: actual capacity used
382        let mut dense_vectors_bytes: usize = 0;
383        let mut dense_vector_count: usize = 0;
384        for b in self.dense_vectors.values() {
385            // vectors: Vec<f32> capacity + doc_ids: Vec<DocId> capacity
386            dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
387                + b.doc_ids.capacity() * size_of::<DocId>()
388                + size_of::<Vec<f32>>()
389                + size_of::<Vec<DocId>>();
390            dense_vector_count += b.doc_ids.len();
391        }
392
393        // Local buffers
394        let local_tf_buffer_bytes =
395            self.local_tf_buffer.capacity() * (size_of::<lasso::Spur>() + size_of::<u32>() + 16);
396
397        let estimated_memory_bytes = postings_bytes
398            + index_overhead_bytes
399            + interner_bytes
400            + field_lengths_bytes
401            + dense_vectors_bytes
402            + local_tf_buffer_bytes;
403
404        let memory_breakdown = MemoryBreakdown {
405            postings_bytes,
406            index_overhead_bytes,
407            interner_bytes,
408            field_lengths_bytes,
409            dense_vectors_bytes,
410            dense_vector_count,
411        };
412
413        SegmentBuilderStats {
414            num_docs: self.next_doc_id,
415            unique_terms: self.inverted_index.len(),
416            postings_in_memory,
417            interned_strings: self.term_interner.len(),
418            doc_field_lengths_size: self.doc_field_lengths.len(),
419            estimated_memory_bytes,
420            memory_breakdown,
421        }
422    }
423
424    /// Add a document - streams to disk immediately
425    pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
426        let doc_id = self.next_doc_id;
427        self.next_doc_id += 1;
428
429        // Initialize field lengths for this document
430        let base_idx = self.doc_field_lengths.len();
431        self.doc_field_lengths
432            .resize(base_idx + self.num_indexed_fields, 0);
433
434        for (field, value) in doc.field_values() {
435            let entry = self.schema.get_field_entry(*field);
436            if entry.is_none() || !entry.unwrap().indexed {
437                continue;
438            }
439
440            let entry = entry.unwrap();
441            match (&entry.field_type, value) {
442                (FieldType::Text, FieldValue::Text(text)) => {
443                    let token_count = self.index_text_field(*field, doc_id, text)?;
444
445                    // Update field statistics
446                    let stats = self.field_stats.entry(field.0).or_default();
447                    stats.total_tokens += token_count as u64;
448                    stats.doc_count += 1;
449
450                    // Store field length compactly
451                    if let Some(&slot) = self.field_to_slot.get(&field.0) {
452                        self.doc_field_lengths[base_idx + slot] = token_count;
453                    }
454                }
455                (FieldType::U64, FieldValue::U64(v)) => {
456                    self.index_numeric_field(*field, doc_id, *v)?;
457                }
458                (FieldType::I64, FieldValue::I64(v)) => {
459                    self.index_numeric_field(*field, doc_id, *v as u64)?;
460                }
461                (FieldType::F64, FieldValue::F64(v)) => {
462                    self.index_numeric_field(*field, doc_id, v.to_bits())?;
463                }
464                (FieldType::DenseVector, FieldValue::DenseVector(vec)) => {
465                    self.index_dense_vector_field(*field, doc_id, vec)?;
466                }
467                (FieldType::SparseVector, FieldValue::SparseVector(entries)) => {
468                    self.index_sparse_vector_field(*field, doc_id, entries)?;
469                }
470                _ => {}
471            }
472        }
473
474        // Stream document to disk immediately
475        self.write_document_to_store(&doc)?;
476
477        Ok(doc_id)
478    }
479
480    /// Index a text field using interned terms
481    ///
482    /// Optimization: Zero-allocation inline tokenization + term frequency aggregation.
483    /// Instead of allocating a String per token, we:
484    /// 1. Iterate over whitespace-split words
485    /// 2. Build lowercase token in a reusable buffer
486    /// 3. Intern directly from the buffer
487    fn index_text_field(&mut self, field: Field, doc_id: DocId, text: &str) -> Result<u32> {
488        // Phase 1: Aggregate term frequencies within this document
489        // Reuse buffer to avoid allocations
490        self.local_tf_buffer.clear();
491
492        let mut token_count = 0u32;
493
494        // Zero-allocation tokenization: iterate words, lowercase inline, intern directly
495        for word in text.split_whitespace() {
496            // Build lowercase token in reusable buffer
497            self.token_buffer.clear();
498            for c in word.chars() {
499                if c.is_alphanumeric() {
500                    for lc in c.to_lowercase() {
501                        self.token_buffer.push(lc);
502                    }
503                }
504            }
505
506            if self.token_buffer.is_empty() {
507                continue;
508            }
509
510            token_count += 1;
511
512            // Intern the term directly from buffer - O(1) amortized
513            let term_spur = self.term_interner.get_or_intern(&self.token_buffer);
514            *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
515        }
516
517        // Phase 2: Insert aggregated terms into inverted index
518        // Now we only do one inverted_index lookup per unique term in doc
519        let field_id = field.0;
520
521        for (&term_spur, &tf) in &self.local_tf_buffer {
522            let term_key = TermKey {
523                field: field_id,
524                term: term_spur,
525            };
526
527            let posting = self
528                .inverted_index
529                .entry(term_key)
530                .or_insert_with(PostingListBuilder::new);
531            posting.add(doc_id, tf);
532        }
533
534        Ok(token_count)
535    }
536
537    fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
538        // For numeric fields, we use a special encoding
539        let term_str = format!("__num_{}", value);
540        let term_spur = self.term_interner.get_or_intern(&term_str);
541
542        let term_key = TermKey {
543            field: field.0,
544            term: term_spur,
545        };
546
547        let posting = self
548            .inverted_index
549            .entry(term_key)
550            .or_insert_with(PostingListBuilder::new);
551        posting.add(doc_id, 1);
552
553        Ok(())
554    }
555
556    /// Index a dense vector field
557    fn index_dense_vector_field(
558        &mut self,
559        field: Field,
560        doc_id: DocId,
561        vector: &[f32],
562    ) -> Result<()> {
563        let dim = vector.len();
564
565        let builder = self
566            .dense_vectors
567            .entry(field.0)
568            .or_insert_with(|| DenseVectorBuilder::new(dim));
569
570        // Verify dimension consistency
571        if builder.dim != dim && builder.len() > 0 {
572            return Err(crate::Error::Schema(format!(
573                "Dense vector dimension mismatch: expected {}, got {}",
574                builder.dim, dim
575            )));
576        }
577
578        builder.add(doc_id, vector);
579        Ok(())
580    }
581
582    /// Index a sparse vector field using dedicated sparse posting lists
583    ///
584    /// Collects (doc_id, weight) postings per dimension. During commit, these are
585    /// converted to BlockSparsePostingList with proper quantization from SparseVectorConfig.
586    ///
587    /// Weights below the configured `weight_threshold` are not indexed.
588    fn index_sparse_vector_field(
589        &mut self,
590        field: Field,
591        doc_id: DocId,
592        entries: &[(u32, f32)],
593    ) -> Result<()> {
594        // Get weight threshold from field config (default 0.0 = no filtering)
595        let weight_threshold = self
596            .schema
597            .get_field_entry(field)
598            .and_then(|entry| entry.sparse_vector_config.as_ref())
599            .map(|config| config.weight_threshold)
600            .unwrap_or(0.0);
601
602        let builder = self
603            .sparse_vectors
604            .entry(field.0)
605            .or_insert_with(SparseVectorBuilder::new);
606
607        for &(dim_id, weight) in entries {
608            // Skip weights below threshold
609            if weight.abs() < weight_threshold {
610                continue;
611            }
612
613            builder.add(dim_id, doc_id, weight);
614        }
615
616        Ok(())
617    }
618
619    /// Write document to streaming store
620    fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
621        use byteorder::{LittleEndian, WriteBytesExt};
622
623        let doc_bytes = super::store::serialize_document(doc, &self.schema)?;
624
625        self.store_file
626            .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
627        self.store_file.write_all(&doc_bytes)?;
628
629        Ok(())
630    }
631
632    /// Build the final segment
633    pub async fn build<D: Directory + DirectoryWriter>(
634        mut self,
635        dir: &D,
636        segment_id: SegmentId,
637    ) -> Result<SegmentMeta> {
638        // Flush any buffered data
639        self.store_file.flush()?;
640
641        let files = SegmentFiles::new(segment_id.0);
642
643        // Extract data needed for parallel processing
644        let store_path = self.store_path.clone();
645        let schema = self.schema.clone();
646        let num_compression_threads = self.config.num_compression_threads;
647        let compression_level = self.config.compression_level;
648
649        // Build postings and document store in parallel
650        let (postings_result, store_result) = rayon::join(
651            || self.build_postings(),
652            || {
653                Self::build_store_parallel(
654                    &store_path,
655                    &schema,
656                    num_compression_threads,
657                    compression_level,
658                )
659            },
660        );
661
662        let (term_dict_data, postings_data) = postings_result?;
663        let store_data = store_result?;
664
665        // Write to directory
666        dir.write(&files.term_dict, &term_dict_data).await?;
667        dir.write(&files.postings, &postings_data).await?;
668        dir.write(&files.store, &store_data).await?;
669
670        // Build and write dense vector indexes (RaBitQ) - all in one file
671        if !self.dense_vectors.is_empty() {
672            let vectors_data = self.build_vectors_file()?;
673            if !vectors_data.is_empty() {
674                dir.write(&files.vectors, &vectors_data).await?;
675            }
676        }
677
678        // Build and write sparse vector posting lists
679        if !self.sparse_vectors.is_empty() {
680            let sparse_data = self.build_sparse_file()?;
681            if !sparse_data.is_empty() {
682                dir.write(&files.sparse, &sparse_data).await?;
683            }
684        }
685
686        let meta = SegmentMeta {
687            id: segment_id.0,
688            num_docs: self.next_doc_id,
689            field_stats: self.field_stats.clone(),
690        };
691
692        dir.write(&files.meta, &meta.serialize()?).await?;
693
694        // Cleanup temp files
695        let _ = std::fs::remove_file(&self.store_path);
696
697        Ok(meta)
698    }
699
700    /// Build unified vectors file containing all dense vector indexes
701    ///
702    /// File format:
703    /// - Header: num_fields (u32)
704    /// - For each field: field_id (u32), index_type (u8), offset (u64), length (u64)
705    /// - Data: concatenated serialized indexes (RaBitQ, IVF-RaBitQ, or ScaNN)
706    fn build_vectors_file(&self) -> Result<Vec<u8>> {
707        use crate::dsl::VectorIndexType;
708        use byteorder::{LittleEndian, WriteBytesExt};
709
710        // Build all indexes first: (field_id, index_type, data)
711        let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
712
713        for (&field_id, builder) in &self.dense_vectors {
714            if builder.len() == 0 {
715                continue;
716            }
717
718            let field = crate::dsl::Field(field_id);
719
720            // Get dense vector config
721            let dense_config = self
722                .schema
723                .get_field_entry(field)
724                .and_then(|e| e.dense_vector_config.as_ref());
725
726            // Get vectors, potentially trimmed for matryoshka/MRL indexing
727            let index_dim = dense_config.map(|c| c.index_dim()).unwrap_or(builder.dim);
728            let vectors = if index_dim < builder.dim {
729                // Trim vectors to mrl_dim for indexing
730                builder.get_vectors_trimmed(index_dim)
731            } else {
732                builder.get_vectors()
733            };
734
735            let (index_type, index_bytes) = match dense_config.map(|c| c.index_type) {
736                Some(VectorIndexType::ScaNN) => {
737                    // ScaNN (IVF-PQ) index
738                    let config = dense_config.unwrap();
739                    let centroids_path =
740                        config.coarse_centroids_path.as_ref().ok_or_else(|| {
741                            crate::Error::Schema("ScaNN requires coarse_centroids_path".into())
742                        })?;
743                    let codebook_path = config.pq_codebook_path.as_ref().ok_or_else(|| {
744                        crate::Error::Schema("ScaNN requires pq_codebook_path".into())
745                    })?;
746
747                    let coarse_centroids = crate::structures::CoarseCentroids::load(
748                        std::path::Path::new(centroids_path),
749                    )
750                    .map_err(crate::Error::Io)?;
751
752                    let pq_codebook =
753                        crate::structures::PQCodebook::load(std::path::Path::new(codebook_path))
754                            .map_err(crate::Error::Io)?;
755
756                    let doc_ids: Vec<u32> = builder.doc_ids.clone();
757                    let ivfpq_config = crate::structures::IVFPQConfig::new(index_dim)
758                        .with_store_raw(config.store_raw);
759
760                    let ivfpq_index = crate::structures::IVFPQIndex::build(
761                        ivfpq_config,
762                        &coarse_centroids,
763                        &pq_codebook,
764                        &vectors,
765                        Some(doc_ids.as_slice()),
766                    );
767
768                    // Serialize ScaNN index (IVFPQIndex only - codebook loaded separately)
769                    let bytes = ivfpq_index
770                        .to_bytes()
771                        .map_err(|e| crate::Error::Serialization(e.to_string()))?;
772                    (2u8, bytes) // 2 = ScaNN
773                }
774                Some(VectorIndexType::IvfRaBitQ) => {
775                    // IVF-RaBitQ index
776                    let config = dense_config.unwrap();
777                    let centroids_path =
778                        config.coarse_centroids_path.as_ref().ok_or_else(|| {
779                            crate::Error::Schema("IVF-RaBitQ requires coarse_centroids_path".into())
780                        })?;
781
782                    match crate::structures::CoarseCentroids::load(std::path::Path::new(
783                        centroids_path,
784                    )) {
785                        Ok(coarse_centroids) => {
786                            let ivf_cfg = crate::structures::IVFRaBitQConfig::new(index_dim)
787                                .with_store_raw(config.store_raw);
788                            let rabitq_codebook = crate::structures::RaBitQCodebook::new(
789                                crate::structures::RaBitQConfig::new(index_dim),
790                            );
791                            let doc_ids: Vec<u32> = builder.doc_ids.clone();
792                            let ivf_index = crate::structures::IVFRaBitQIndex::build(
793                                ivf_cfg,
794                                &coarse_centroids,
795                                &rabitq_codebook,
796                                &vectors,
797                                Some(doc_ids.as_slice()),
798                            );
799                            let bytes = ivf_index
800                                .to_bytes()
801                                .map_err(|e| crate::Error::Serialization(e.to_string()))?;
802                            (1u8, bytes) // 1 = IVF-RaBitQ
803                        }
804                        Err(e) => {
805                            log::warn!("Failed to load centroids: {}, falling back to RaBitQ", e);
806                            let cfg = crate::structures::RaBitQConfig::new(index_dim);
807                            let idx = crate::structures::RaBitQIndex::build(cfg, &vectors, true);
808                            let bytes = serde_json::to_vec(&idx)
809                                .map_err(|e| crate::Error::Serialization(e.to_string()))?;
810                            (0u8, bytes) // 0 = RaBitQ
811                        }
812                    }
813                }
814                _ => {
815                    // Default: RaBitQ
816                    let store_raw = dense_config.map(|c| c.store_raw).unwrap_or(true);
817                    let cfg = crate::structures::RaBitQConfig::new(index_dim);
818                    let idx = crate::structures::RaBitQIndex::build(cfg, &vectors, store_raw);
819                    let bytes = serde_json::to_vec(&idx)
820                        .map_err(|e| crate::Error::Serialization(e.to_string()))?;
821                    (0u8, bytes) // 0 = RaBitQ
822                }
823            };
824
825            field_indexes.push((field_id, index_type, index_bytes));
826        }
827
828        if field_indexes.is_empty() {
829            return Ok(Vec::new());
830        }
831
832        // Sort by field_id for consistent ordering
833        field_indexes.sort_by_key(|(id, _, _)| *id);
834
835        // Calculate header size: num_fields + (field_id, index_type, offset, len) per field
836        let header_size = 4 + field_indexes.len() * (4 + 1 + 8 + 8);
837
838        // Build output
839        let mut output = Vec::new();
840
841        // Write number of fields
842        output.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
843
844        // Calculate offsets and write header entries
845        let mut current_offset = header_size as u64;
846        for (field_id, index_type, data) in &field_indexes {
847            output.write_u32::<LittleEndian>(*field_id)?;
848            output.write_u8(*index_type)?;
849            output.write_u64::<LittleEndian>(current_offset)?;
850            output.write_u64::<LittleEndian>(data.len() as u64)?;
851            current_offset += data.len() as u64;
852        }
853
854        // Write data
855        for (_, _, data) in field_indexes {
856            output.extend_from_slice(&data);
857        }
858
859        Ok(output)
860    }
861
862    /// Build sparse vectors file containing BlockSparsePostingList per field/dimension
863    ///
864    /// File format (direct-indexed table for O(1) dimension lookup):
865    /// - Header: num_fields (u32)
866    /// - For each field:
867    ///   - field_id (u32)
868    ///   - quantization (u8)
869    ///   - max_dim_id (u32)          ← highest dimension ID + 1 (table size)
870    ///   - table: [(offset: u64, length: u32)] × max_dim_id  ← direct indexed by dim_id
871    ///     (offset=0, length=0 means dimension not present)
872    /// - Data: concatenated serialized BlockSparsePostingList
873    fn build_sparse_file(&self) -> Result<Vec<u8>> {
874        use crate::structures::{BlockSparsePostingList, WeightQuantization};
875        use byteorder::{LittleEndian, WriteBytesExt};
876
877        if self.sparse_vectors.is_empty() {
878            return Ok(Vec::new());
879        }
880
881        // Collect field data: (field_id, quantization, max_dim_id, dim_id -> serialized_bytes)
882        type SparseFieldData = (u32, WeightQuantization, u32, FxHashMap<u32, Vec<u8>>);
883        let mut field_data: Vec<SparseFieldData> = Vec::new();
884
885        for (&field_id, builder) in &self.sparse_vectors {
886            if builder.is_empty() {
887                continue;
888            }
889
890            let field = crate::dsl::Field(field_id);
891
892            // Get quantization from field config
893            let quantization = self
894                .schema
895                .get_field_entry(field)
896                .and_then(|e| e.sparse_vector_config.as_ref())
897                .map(|c| c.weight_quantization)
898                .unwrap_or(WeightQuantization::Float32);
899
900            // Find max dimension ID
901            let max_dim_id = builder.postings.keys().max().copied().unwrap_or(0);
902
903            // Build BlockSparsePostingList for each dimension
904            let mut dim_bytes: FxHashMap<u32, Vec<u8>> = FxHashMap::default();
905
906            for (&dim_id, postings) in &builder.postings {
907                // Sort postings by doc_id
908                let mut sorted_postings = postings.clone();
909                sorted_postings.sort_by_key(|(doc_id, _)| *doc_id);
910
911                // Build BlockSparsePostingList
912                let block_list =
913                    BlockSparsePostingList::from_postings(&sorted_postings, quantization)
914                        .map_err(crate::Error::Io)?;
915
916                // Serialize
917                let mut bytes = Vec::new();
918                block_list.serialize(&mut bytes).map_err(crate::Error::Io)?;
919
920                dim_bytes.insert(dim_id, bytes);
921            }
922
923            field_data.push((field_id, quantization, max_dim_id + 1, dim_bytes));
924        }
925
926        if field_data.is_empty() {
927            return Ok(Vec::new());
928        }
929
930        // Sort by field_id
931        field_data.sort_by_key(|(id, _, _, _)| *id);
932
933        // Calculate header size
934        // Header: num_fields (4)
935        // Per field: field_id (4) + quant (1) + max_dim_id (4) + table (12 * max_dim_id)
936        let mut header_size = 4u64;
937        for (_, _, max_dim_id, _) in &field_data {
938            header_size += 4 + 1 + 4; // field_id + quant + max_dim_id
939            header_size += (*max_dim_id as u64) * 12; // table entries: (offset: u64, length: u32)
940        }
941
942        // Build output
943        let mut output = Vec::new();
944
945        // Write num_fields
946        output.write_u32::<LittleEndian>(field_data.len() as u32)?;
947
948        // Track current data offset (after all headers)
949        let mut current_offset = header_size;
950
951        // First, collect all data bytes in order and build offset tables
952        let mut all_data: Vec<u8> = Vec::new();
953        let mut field_tables: Vec<Vec<(u64, u32)>> = Vec::new();
954
955        for (_, _, max_dim_id, dim_bytes) in &field_data {
956            let mut table: Vec<(u64, u32)> = vec![(0, 0); *max_dim_id as usize];
957
958            // Process dimensions in order
959            for dim_id in 0..*max_dim_id {
960                if let Some(bytes) = dim_bytes.get(&dim_id) {
961                    table[dim_id as usize] = (current_offset, bytes.len() as u32);
962                    current_offset += bytes.len() as u64;
963                    all_data.extend_from_slice(bytes);
964                }
965                // else: table entry stays (0, 0) meaning dimension not present
966            }
967
968            field_tables.push(table);
969        }
970
971        // Write field headers and tables
972        for (i, (field_id, quantization, max_dim_id, _)) in field_data.iter().enumerate() {
973            output.write_u32::<LittleEndian>(*field_id)?;
974            output.write_u8(*quantization as u8)?;
975            output.write_u32::<LittleEndian>(*max_dim_id)?;
976
977            // Write table (direct indexed by dim_id)
978            for &(offset, length) in &field_tables[i] {
979                output.write_u64::<LittleEndian>(offset)?;
980                output.write_u32::<LittleEndian>(length)?;
981            }
982        }
983
984        // Write data
985        output.extend_from_slice(&all_data);
986
987        Ok(output)
988    }
989
990    /// Build postings from inverted index
991    ///
992    /// Uses parallel processing to serialize posting lists concurrently.
993    fn build_postings(&mut self) -> Result<(Vec<u8>, Vec<u8>)> {
994        // Phase 1: Collect and sort term keys (parallel key generation)
995        // Key format: field_id (4 bytes) + term bytes
996        let mut term_entries: Vec<(Vec<u8>, &PostingListBuilder)> = self
997            .inverted_index
998            .iter()
999            .map(|(term_key, posting_list)| {
1000                let term_str = self.term_interner.resolve(&term_key.term);
1001                let mut key = Vec::with_capacity(4 + term_str.len());
1002                key.extend_from_slice(&term_key.field.to_le_bytes());
1003                key.extend_from_slice(term_str.as_bytes());
1004                (key, posting_list)
1005            })
1006            .collect();
1007
1008        // Sort by key for SSTable ordering
1009        term_entries.par_sort_unstable_by(|a, b| a.0.cmp(&b.0));
1010
1011        // Phase 2: Parallel serialization of posting lists
1012        // Each term's posting list is serialized independently
1013        let serialized: Vec<(Vec<u8>, SerializedPosting)> = term_entries
1014            .into_par_iter()
1015            .map(|(key, posting_builder)| {
1016                // Build posting list from in-memory postings
1017                let mut full_postings = PostingList::with_capacity(posting_builder.len());
1018                for p in &posting_builder.postings {
1019                    full_postings.push(p.doc_id, p.term_freq as u32);
1020                }
1021
1022                // Build term info
1023                let doc_ids: Vec<u32> = full_postings.iter().map(|p| p.doc_id).collect();
1024                let term_freqs: Vec<u32> = full_postings.iter().map(|p| p.term_freq).collect();
1025
1026                let result = if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
1027                    SerializedPosting::Inline(inline)
1028                } else {
1029                    // Serialize to local buffer
1030                    let mut posting_bytes = Vec::new();
1031                    let block_list =
1032                        crate::structures::BlockPostingList::from_posting_list(&full_postings)
1033                            .expect("BlockPostingList creation failed");
1034                    block_list
1035                        .serialize(&mut posting_bytes)
1036                        .expect("BlockPostingList serialization failed");
1037                    SerializedPosting::External {
1038                        bytes: posting_bytes,
1039                        doc_count: full_postings.doc_count(),
1040                    }
1041                };
1042
1043                (key, result)
1044            })
1045            .collect();
1046
1047        // Phase 3: Sequential assembly (must be sequential for offset calculation)
1048        let mut term_dict = Vec::new();
1049        let mut postings = Vec::new();
1050        let mut writer = SSTableWriter::<TermInfo>::new(&mut term_dict);
1051
1052        for (key, serialized_posting) in serialized {
1053            let term_info = match serialized_posting {
1054                SerializedPosting::Inline(info) => info,
1055                SerializedPosting::External { bytes, doc_count } => {
1056                    let posting_offset = postings.len() as u64;
1057                    let posting_len = bytes.len() as u32;
1058                    postings.extend_from_slice(&bytes);
1059                    TermInfo::external(posting_offset, posting_len, doc_count)
1060                }
1061            };
1062
1063            writer.insert(&key, &term_info)?;
1064        }
1065
1066        writer.finish()?;
1067        Ok((term_dict, postings))
1068    }
1069
1070    /// Build document store from streamed temp file (static method for parallel execution)
1071    ///
1072    /// Uses parallel processing to deserialize documents concurrently.
1073    fn build_store_parallel(
1074        store_path: &PathBuf,
1075        schema: &Schema,
1076        num_compression_threads: usize,
1077        compression_level: CompressionLevel,
1078    ) -> Result<Vec<u8>> {
1079        use super::store::EagerParallelStoreWriter;
1080
1081        let file = File::open(store_path)?;
1082        let mmap = unsafe { memmap2::Mmap::map(&file)? };
1083
1084        // Phase 1: Parse document boundaries (sequential, fast)
1085        let mut doc_ranges: Vec<(usize, usize)> = Vec::new();
1086        let mut offset = 0usize;
1087        while offset + 4 <= mmap.len() {
1088            let doc_len = u32::from_le_bytes([
1089                mmap[offset],
1090                mmap[offset + 1],
1091                mmap[offset + 2],
1092                mmap[offset + 3],
1093            ]) as usize;
1094            offset += 4;
1095
1096            if offset + doc_len > mmap.len() {
1097                break;
1098            }
1099
1100            doc_ranges.push((offset, doc_len));
1101            offset += doc_len;
1102        }
1103
1104        // Phase 2: Parallel deserialization of documents
1105        let docs: Vec<Document> = doc_ranges
1106            .into_par_iter()
1107            .filter_map(|(start, len)| {
1108                let doc_bytes = &mmap[start..start + len];
1109                super::store::deserialize_document(doc_bytes, schema).ok()
1110            })
1111            .collect();
1112
1113        // Phase 3: Write to store (compression is already parallel in EagerParallelStoreWriter)
1114        let mut store_data = Vec::new();
1115        let mut store_writer = EagerParallelStoreWriter::with_compression_level(
1116            &mut store_data,
1117            num_compression_threads,
1118            compression_level,
1119        );
1120
1121        for doc in &docs {
1122            store_writer.store(doc, schema)?;
1123        }
1124
1125        store_writer.finish()?;
1126        Ok(store_data)
1127    }
1128}
1129
1130impl Drop for SegmentBuilder {
1131    fn drop(&mut self) {
1132        // Cleanup temp files on drop
1133        let _ = std::fs::remove_file(&self.store_path);
1134    }
1135}