hermes_core/segment/
builder.rs

1//! Streaming segment builder with optimized memory usage
2//!
3//! Key optimizations:
4//! - **String interning**: Terms are interned using `lasso` to avoid repeated allocations
5//! - **hashbrown HashMap**: O(1) average insertion instead of BTreeMap's O(log n)
6//! - **Streaming document store**: Documents written to disk immediately
7//! - **Incremental posting flush**: Large posting lists flushed to temp file
8//! - **Memory-mapped intermediate files**: Reduces memory pressure
9//! - **Arena allocation**: Batch allocations for reduced fragmentation
10
11use std::fs::{File, OpenOptions};
12use std::io::{BufWriter, Write};
13use std::path::PathBuf;
14use std::sync::Arc;
15
16use hashbrown::HashMap;
17use lasso::{Rodeo, Spur};
18use rayon::prelude::*;
19use rustc_hash::FxHashMap;
20
21use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
22use crate::compression::CompressionLevel;
23use crate::directories::{Directory, DirectoryWriter};
24use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
25use crate::structures::{PostingList, SSTableWriter, TermInfo};
26use crate::tokenizer::BoxedTokenizer;
27use crate::wand::WandData;
28use crate::{DocId, Result};
29
30/// Size of the document store buffer before writing to disk
31const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; // 16MB
32
33/// Interned term key combining field and term
34#[derive(Clone, Copy, PartialEq, Eq, Hash)]
35struct TermKey {
36    field: u32,
37    term: Spur,
38}
39
40/// Compact posting entry for in-memory storage
41#[derive(Clone, Copy)]
42struct CompactPosting {
43    doc_id: DocId,
44    term_freq: u16,
45}
46
47/// In-memory posting list for a term
48struct PostingListBuilder {
49    /// In-memory postings
50    postings: Vec<CompactPosting>,
51}
52
53impl PostingListBuilder {
54    fn new() -> Self {
55        Self {
56            postings: Vec::new(),
57        }
58    }
59
60    /// Add a posting, merging if same doc_id as last
61    #[inline]
62    fn add(&mut self, doc_id: DocId, term_freq: u32) {
63        // Check if we can merge with the last posting
64        if let Some(last) = self.postings.last_mut()
65            && last.doc_id == doc_id
66        {
67            last.term_freq = last.term_freq.saturating_add(term_freq as u16);
68            return;
69        }
70        self.postings.push(CompactPosting {
71            doc_id,
72            term_freq: term_freq.min(u16::MAX as u32) as u16,
73        });
74    }
75
76    fn len(&self) -> usize {
77        self.postings.len()
78    }
79}
80
81/// Intermediate result for parallel posting serialization
82enum SerializedPosting {
83    /// Inline posting (small enough to fit in TermInfo)
84    Inline(TermInfo),
85    /// External posting with serialized bytes
86    External { bytes: Vec<u8>, doc_count: u32 },
87}
88
89/// Statistics for debugging segment builder performance
90#[derive(Debug, Clone)]
91pub struct SegmentBuilderStats {
92    /// Number of documents indexed
93    pub num_docs: u32,
94    /// Number of unique terms in the inverted index
95    pub unique_terms: usize,
96    /// Total postings in memory (across all terms)
97    pub postings_in_memory: usize,
98    /// Number of interned strings
99    pub interned_strings: usize,
100    /// Size of doc_field_lengths vector
101    pub doc_field_lengths_size: usize,
102}
103
104/// Configuration for segment builder
105#[derive(Clone)]
106pub struct SegmentBuilderConfig {
107    /// Directory for temporary spill files
108    pub temp_dir: PathBuf,
109    /// Compression level for document store
110    pub compression_level: CompressionLevel,
111    /// Number of threads for parallel compression
112    pub num_compression_threads: usize,
113    /// Initial capacity for term interner
114    pub interner_capacity: usize,
115    /// Initial capacity for posting lists hashmap
116    pub posting_map_capacity: usize,
117}
118
119impl Default for SegmentBuilderConfig {
120    fn default() -> Self {
121        Self {
122            temp_dir: std::env::temp_dir(),
123            compression_level: CompressionLevel(7),
124            num_compression_threads: num_cpus::get(),
125            interner_capacity: 1_000_000,
126            posting_map_capacity: 500_000,
127        }
128    }
129}
130
131/// Segment builder with optimized memory usage
132///
133/// Features:
134/// - Streams documents to disk immediately (no in-memory document storage)
135/// - Uses string interning for terms (reduced allocations)
136/// - Uses hashbrown HashMap (faster than BTreeMap)
137pub struct SegmentBuilder {
138    schema: Schema,
139    config: SegmentBuilderConfig,
140    tokenizers: FxHashMap<Field, BoxedTokenizer>,
141
142    /// String interner for terms - O(1) lookup and deduplication
143    term_interner: Rodeo,
144
145    /// Inverted index: term key -> posting list
146    inverted_index: HashMap<TermKey, PostingListBuilder>,
147
148    /// Streaming document store writer
149    store_file: BufWriter<File>,
150    store_path: PathBuf,
151
152    /// Document count
153    next_doc_id: DocId,
154
155    /// Per-field statistics for BM25F
156    field_stats: FxHashMap<u32, FieldStats>,
157
158    /// Per-document field lengths stored compactly
159    /// Uses a flat Vec instead of Vec<HashMap> for better cache locality
160    /// Layout: [doc0_field0_len, doc0_field1_len, ..., doc1_field0_len, ...]
161    doc_field_lengths: Vec<u32>,
162    num_indexed_fields: usize,
163    field_to_slot: FxHashMap<u32, usize>,
164
165    /// Optional pre-computed WAND data for IDF values
166    wand_data: Option<Arc<WandData>>,
167
168    /// Reusable buffer for per-document term frequency aggregation
169    /// Avoids allocating a new hashmap for each document
170    local_tf_buffer: FxHashMap<Spur, u32>,
171
172    /// Reusable buffer for tokenization to avoid per-token String allocations
173    token_buffer: String,
174
175    /// Dense vector storage per field: field -> (doc_ids, vectors)
176    /// Vectors are stored as flat f32 arrays for efficient RaBitQ indexing
177    dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
178}
179
180/// Builder for dense vector index using RaBitQ
181struct DenseVectorBuilder {
182    /// Dimension of vectors
183    dim: usize,
184    /// Document IDs with vectors
185    doc_ids: Vec<DocId>,
186    /// Flat vector storage (doc_ids.len() * dim floats)
187    vectors: Vec<f32>,
188}
189
190impl DenseVectorBuilder {
191    fn new(dim: usize) -> Self {
192        Self {
193            dim,
194            doc_ids: Vec::new(),
195            vectors: Vec::new(),
196        }
197    }
198
199    fn add(&mut self, doc_id: DocId, vector: &[f32]) {
200        debug_assert_eq!(vector.len(), self.dim, "Vector dimension mismatch");
201        self.doc_ids.push(doc_id);
202        self.vectors.extend_from_slice(vector);
203    }
204
205    fn len(&self) -> usize {
206        self.doc_ids.len()
207    }
208
209    /// Get all vectors as Vec<Vec<f32>> for RaBitQ indexing
210    fn get_vectors(&self) -> Vec<Vec<f32>> {
211        self.doc_ids
212            .iter()
213            .enumerate()
214            .map(|(i, _)| {
215                let start = i * self.dim;
216                self.vectors[start..start + self.dim].to_vec()
217            })
218            .collect()
219    }
220
221    /// Get vectors trimmed to specified dimension for matryoshka/MRL indexing
222    fn get_vectors_trimmed(&self, trim_dim: usize) -> Vec<Vec<f32>> {
223        debug_assert!(trim_dim <= self.dim, "trim_dim must be <= dim");
224        self.doc_ids
225            .iter()
226            .enumerate()
227            .map(|(i, _)| {
228                let start = i * self.dim;
229                self.vectors[start..start + trim_dim].to_vec()
230            })
231            .collect()
232    }
233}
234
235impl SegmentBuilder {
236    /// Create a new segment builder
237    pub fn new(schema: Schema, config: SegmentBuilderConfig) -> Result<Self> {
238        let segment_id = uuid::Uuid::new_v4();
239        let store_path = config
240            .temp_dir
241            .join(format!("hermes_store_{}.tmp", segment_id));
242
243        let store_file = BufWriter::with_capacity(
244            STORE_BUFFER_SIZE,
245            OpenOptions::new()
246                .create(true)
247                .write(true)
248                .truncate(true)
249                .open(&store_path)?,
250        );
251
252        // Count indexed fields for compact field length storage
253        let mut num_indexed_fields = 0;
254        let mut field_to_slot = FxHashMap::default();
255        for (field, entry) in schema.fields() {
256            if entry.indexed && matches!(entry.field_type, FieldType::Text) {
257                field_to_slot.insert(field.0, num_indexed_fields);
258                num_indexed_fields += 1;
259            }
260        }
261
262        Ok(Self {
263            schema,
264            tokenizers: FxHashMap::default(),
265            term_interner: Rodeo::new(),
266            inverted_index: HashMap::with_capacity(config.posting_map_capacity),
267            store_file,
268            store_path,
269            next_doc_id: 0,
270            field_stats: FxHashMap::default(),
271            doc_field_lengths: Vec::new(),
272            num_indexed_fields,
273            field_to_slot,
274            wand_data: None,
275            local_tf_buffer: FxHashMap::default(),
276            token_buffer: String::with_capacity(64),
277            config,
278            dense_vectors: FxHashMap::default(),
279        })
280    }
281
282    /// Create with pre-computed WAND data
283    pub fn with_wand_data(
284        schema: Schema,
285        config: SegmentBuilderConfig,
286        wand_data: Arc<WandData>,
287    ) -> Result<Self> {
288        let mut builder = Self::new(schema, config)?;
289        builder.wand_data = Some(wand_data);
290        Ok(builder)
291    }
292
293    pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
294        self.tokenizers.insert(field, tokenizer);
295    }
296
297    pub fn num_docs(&self) -> u32 {
298        self.next_doc_id
299    }
300
301    /// Get current statistics for debugging performance
302    pub fn stats(&self) -> SegmentBuilderStats {
303        let postings_in_memory: usize =
304            self.inverted_index.values().map(|p| p.postings.len()).sum();
305        SegmentBuilderStats {
306            num_docs: self.next_doc_id,
307            unique_terms: self.inverted_index.len(),
308            postings_in_memory,
309            interned_strings: self.term_interner.len(),
310            doc_field_lengths_size: self.doc_field_lengths.len(),
311        }
312    }
313
314    /// Add a document - streams to disk immediately
315    pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
316        let doc_id = self.next_doc_id;
317        self.next_doc_id += 1;
318
319        // Initialize field lengths for this document
320        let base_idx = self.doc_field_lengths.len();
321        self.doc_field_lengths
322            .resize(base_idx + self.num_indexed_fields, 0);
323
324        for (field, value) in doc.field_values() {
325            let entry = self.schema.get_field_entry(*field);
326            if entry.is_none() || !entry.unwrap().indexed {
327                continue;
328            }
329
330            let entry = entry.unwrap();
331            match (&entry.field_type, value) {
332                (FieldType::Text, FieldValue::Text(text)) => {
333                    let token_count = self.index_text_field(*field, doc_id, text)?;
334
335                    // Update field statistics
336                    let stats = self.field_stats.entry(field.0).or_default();
337                    stats.total_tokens += token_count as u64;
338                    stats.doc_count += 1;
339
340                    // Store field length compactly
341                    if let Some(&slot) = self.field_to_slot.get(&field.0) {
342                        self.doc_field_lengths[base_idx + slot] = token_count;
343                    }
344                }
345                (FieldType::U64, FieldValue::U64(v)) => {
346                    self.index_numeric_field(*field, doc_id, *v)?;
347                }
348                (FieldType::I64, FieldValue::I64(v)) => {
349                    self.index_numeric_field(*field, doc_id, *v as u64)?;
350                }
351                (FieldType::F64, FieldValue::F64(v)) => {
352                    self.index_numeric_field(*field, doc_id, v.to_bits())?;
353                }
354                (FieldType::DenseVector, FieldValue::DenseVector(vec)) => {
355                    self.index_dense_vector_field(*field, doc_id, vec)?;
356                }
357                (FieldType::SparseVector, FieldValue::SparseVector { indices, values }) => {
358                    self.index_sparse_vector_field(*field, doc_id, indices, values)?;
359                }
360                _ => {}
361            }
362        }
363
364        // Stream document to disk immediately
365        self.write_document_to_store(&doc)?;
366
367        Ok(doc_id)
368    }
369
370    /// Index a text field using interned terms
371    ///
372    /// Optimization: Zero-allocation inline tokenization + term frequency aggregation.
373    /// Instead of allocating a String per token, we:
374    /// 1. Iterate over whitespace-split words
375    /// 2. Build lowercase token in a reusable buffer
376    /// 3. Intern directly from the buffer
377    fn index_text_field(&mut self, field: Field, doc_id: DocId, text: &str) -> Result<u32> {
378        // Phase 1: Aggregate term frequencies within this document
379        // Reuse buffer to avoid allocations
380        self.local_tf_buffer.clear();
381
382        let mut token_count = 0u32;
383
384        // Zero-allocation tokenization: iterate words, lowercase inline, intern directly
385        for word in text.split_whitespace() {
386            // Build lowercase token in reusable buffer
387            self.token_buffer.clear();
388            for c in word.chars() {
389                if c.is_alphanumeric() {
390                    for lc in c.to_lowercase() {
391                        self.token_buffer.push(lc);
392                    }
393                }
394            }
395
396            if self.token_buffer.is_empty() {
397                continue;
398            }
399
400            token_count += 1;
401
402            // Intern the term directly from buffer - O(1) amortized
403            let term_spur = self.term_interner.get_or_intern(&self.token_buffer);
404            *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
405        }
406
407        // Phase 2: Insert aggregated terms into inverted index
408        // Now we only do one inverted_index lookup per unique term in doc
409        let field_id = field.0;
410
411        for (&term_spur, &tf) in &self.local_tf_buffer {
412            let term_key = TermKey {
413                field: field_id,
414                term: term_spur,
415            };
416
417            let posting = self
418                .inverted_index
419                .entry(term_key)
420                .or_insert_with(PostingListBuilder::new);
421            posting.add(doc_id, tf);
422        }
423
424        Ok(token_count)
425    }
426
427    fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
428        // For numeric fields, we use a special encoding
429        let term_str = format!("__num_{}", value);
430        let term_spur = self.term_interner.get_or_intern(&term_str);
431
432        let term_key = TermKey {
433            field: field.0,
434            term: term_spur,
435        };
436
437        let posting = self
438            .inverted_index
439            .entry(term_key)
440            .or_insert_with(PostingListBuilder::new);
441        posting.add(doc_id, 1);
442
443        Ok(())
444    }
445
446    /// Index a dense vector field
447    fn index_dense_vector_field(
448        &mut self,
449        field: Field,
450        doc_id: DocId,
451        vector: &[f32],
452    ) -> Result<()> {
453        let dim = vector.len();
454
455        let builder = self
456            .dense_vectors
457            .entry(field.0)
458            .or_insert_with(|| DenseVectorBuilder::new(dim));
459
460        // Verify dimension consistency
461        if builder.dim != dim && builder.len() > 0 {
462            return Err(crate::Error::Schema(format!(
463                "Dense vector dimension mismatch: expected {}, got {}",
464                builder.dim, dim
465            )));
466        }
467
468        builder.add(doc_id, vector);
469        Ok(())
470    }
471
472    /// Index a sparse vector field using the inverted index
473    ///
474    /// Each dimension becomes a term (prefixed with __sparse_), and the weight
475    /// is stored as a quantized term frequency. This allows reusing the existing
476    /// posting list infrastructure for sparse vector retrieval.
477    ///
478    /// Weights below the configured `weight_threshold` are not indexed.
479    fn index_sparse_vector_field(
480        &mut self,
481        field: Field,
482        doc_id: DocId,
483        indices: &[u32],
484        values: &[f32],
485    ) -> Result<()> {
486        // Get weight threshold from field config (default 0.0 = no filtering)
487        let weight_threshold = self
488            .schema
489            .get_field_entry(field)
490            .and_then(|entry| entry.sparse_vector_config.as_ref())
491            .map(|config| config.weight_threshold)
492            .unwrap_or(0.0);
493
494        for (&dim_id, &weight) in indices.iter().zip(values.iter()) {
495            // Skip weights below threshold
496            if weight.abs() < weight_threshold {
497                continue;
498            }
499
500            // Use a special prefix to distinguish sparse vector dimensions from text terms
501            let term_str = format!("__sparse_{}", dim_id);
502            let term_spur = self.term_interner.get_or_intern(&term_str);
503
504            let term_key = TermKey {
505                field: field.0,
506                term: term_spur,
507            };
508
509            // Quantize weight to term frequency (scale by 1000 for precision)
510            // This is a simple approach; more sophisticated quantization can be added
511            let quantized_weight = (weight.abs() * 1000.0).min(u16::MAX as f32) as u32;
512
513            let posting = self
514                .inverted_index
515                .entry(term_key)
516                .or_insert_with(PostingListBuilder::new);
517            posting.add(doc_id, quantized_weight.max(1));
518        }
519
520        Ok(())
521    }
522
523    /// Write document to streaming store
524    fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
525        use byteorder::{LittleEndian, WriteBytesExt};
526
527        let doc_bytes = super::store::serialize_document(doc, &self.schema)?;
528
529        self.store_file
530            .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
531        self.store_file.write_all(&doc_bytes)?;
532
533        Ok(())
534    }
535
536    /// Build the final segment
537    pub async fn build<D: Directory + DirectoryWriter>(
538        mut self,
539        dir: &D,
540        segment_id: SegmentId,
541    ) -> Result<SegmentMeta> {
542        // Flush any buffered data
543        self.store_file.flush()?;
544
545        let files = SegmentFiles::new(segment_id.0);
546
547        // Extract data needed for parallel processing
548        let store_path = self.store_path.clone();
549        let schema = self.schema.clone();
550        let num_compression_threads = self.config.num_compression_threads;
551        let compression_level = self.config.compression_level;
552
553        // Build postings and document store in parallel
554        let (postings_result, store_result) = rayon::join(
555            || self.build_postings(),
556            || {
557                Self::build_store_parallel(
558                    &store_path,
559                    &schema,
560                    num_compression_threads,
561                    compression_level,
562                )
563            },
564        );
565
566        let (term_dict_data, postings_data) = postings_result?;
567        let store_data = store_result?;
568
569        // Write to directory
570        dir.write(&files.term_dict, &term_dict_data).await?;
571        dir.write(&files.postings, &postings_data).await?;
572        dir.write(&files.store, &store_data).await?;
573
574        // Build and write dense vector indexes (RaBitQ) - all in one file
575        if !self.dense_vectors.is_empty() {
576            let vectors_data = self.build_vectors_file()?;
577            if !vectors_data.is_empty() {
578                dir.write(&files.vectors, &vectors_data).await?;
579            }
580        }
581
582        let meta = SegmentMeta {
583            id: segment_id.0,
584            num_docs: self.next_doc_id,
585            field_stats: self.field_stats.clone(),
586        };
587
588        dir.write(&files.meta, &meta.serialize()?).await?;
589
590        // Cleanup temp files
591        let _ = std::fs::remove_file(&self.store_path);
592
593        Ok(meta)
594    }
595
596    /// Build unified vectors file containing all dense vector indexes
597    ///
598    /// File format:
599    /// - Header: num_fields (u32)
600    /// - For each field: field_id (u32), index_type (u8), offset (u64), length (u64)
601    /// - Data: concatenated serialized indexes (RaBitQ, IVF-RaBitQ, or ScaNN)
602    fn build_vectors_file(&self) -> Result<Vec<u8>> {
603        use crate::dsl::VectorIndexType;
604        use byteorder::{LittleEndian, WriteBytesExt};
605
606        // Build all indexes first: (field_id, index_type, data)
607        let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
608
609        for (&field_id, builder) in &self.dense_vectors {
610            if builder.len() == 0 {
611                continue;
612            }
613
614            let field = crate::dsl::Field(field_id);
615
616            // Get dense vector config
617            let dense_config = self
618                .schema
619                .get_field_entry(field)
620                .and_then(|e| e.dense_vector_config.as_ref());
621
622            // Get vectors, potentially trimmed for matryoshka/MRL indexing
623            let index_dim = dense_config.map(|c| c.index_dim()).unwrap_or(builder.dim);
624            let vectors = if index_dim < builder.dim {
625                // Trim vectors to mrl_dim for indexing
626                builder.get_vectors_trimmed(index_dim)
627            } else {
628                builder.get_vectors()
629            };
630
631            let (index_type, index_bytes) = match dense_config.map(|c| c.index_type) {
632                Some(VectorIndexType::ScaNN) => {
633                    // ScaNN (IVF-PQ) index
634                    let config = dense_config.unwrap();
635                    let centroids_path =
636                        config.coarse_centroids_path.as_ref().ok_or_else(|| {
637                            crate::Error::Schema("ScaNN requires coarse_centroids_path".into())
638                        })?;
639                    let codebook_path = config.pq_codebook_path.as_ref().ok_or_else(|| {
640                        crate::Error::Schema("ScaNN requires pq_codebook_path".into())
641                    })?;
642
643                    let coarse_centroids = crate::structures::CoarseCentroids::load(
644                        std::path::Path::new(centroids_path),
645                    )
646                    .map_err(crate::Error::Io)?;
647
648                    let pq_codebook =
649                        crate::structures::PQCodebook::load(std::path::Path::new(codebook_path))
650                            .map_err(crate::Error::Io)?;
651
652                    let doc_ids: Vec<u32> = builder.doc_ids.clone();
653                    let ivfpq_config = crate::structures::IVFPQConfig::new(index_dim)
654                        .with_store_raw(config.store_raw);
655
656                    let ivfpq_index = crate::structures::IVFPQIndex::build(
657                        ivfpq_config,
658                        &coarse_centroids,
659                        &pq_codebook,
660                        &vectors,
661                        Some(doc_ids.as_slice()),
662                    );
663
664                    // Serialize ScaNN index (IVFPQIndex only - codebook loaded separately)
665                    let bytes = ivfpq_index
666                        .to_bytes()
667                        .map_err(|e| crate::Error::Serialization(e.to_string()))?;
668                    (2u8, bytes) // 2 = ScaNN
669                }
670                Some(VectorIndexType::IvfRaBitQ) => {
671                    // IVF-RaBitQ index
672                    let config = dense_config.unwrap();
673                    let centroids_path =
674                        config.coarse_centroids_path.as_ref().ok_or_else(|| {
675                            crate::Error::Schema("IVF-RaBitQ requires coarse_centroids_path".into())
676                        })?;
677
678                    match crate::structures::CoarseCentroids::load(std::path::Path::new(
679                        centroids_path,
680                    )) {
681                        Ok(coarse_centroids) => {
682                            let ivf_cfg = crate::structures::IVFRaBitQConfig::new(index_dim)
683                                .with_store_raw(config.store_raw);
684                            let rabitq_codebook = crate::structures::RaBitQCodebook::new(
685                                crate::structures::RaBitQConfig::new(index_dim),
686                            );
687                            let doc_ids: Vec<u32> = builder.doc_ids.clone();
688                            let ivf_index = crate::structures::IVFRaBitQIndex::build(
689                                ivf_cfg,
690                                &coarse_centroids,
691                                &rabitq_codebook,
692                                &vectors,
693                                Some(doc_ids.as_slice()),
694                            );
695                            let bytes = ivf_index
696                                .to_bytes()
697                                .map_err(|e| crate::Error::Serialization(e.to_string()))?;
698                            (1u8, bytes) // 1 = IVF-RaBitQ
699                        }
700                        Err(e) => {
701                            log::warn!("Failed to load centroids: {}, falling back to RaBitQ", e);
702                            let cfg = crate::structures::RaBitQConfig::new(index_dim);
703                            let idx = crate::structures::RaBitQIndex::build(cfg, &vectors, true);
704                            let bytes = serde_json::to_vec(&idx)
705                                .map_err(|e| crate::Error::Serialization(e.to_string()))?;
706                            (0u8, bytes) // 0 = RaBitQ
707                        }
708                    }
709                }
710                _ => {
711                    // Default: RaBitQ
712                    let store_raw = dense_config.map(|c| c.store_raw).unwrap_or(true);
713                    let cfg = crate::structures::RaBitQConfig::new(index_dim);
714                    let idx = crate::structures::RaBitQIndex::build(cfg, &vectors, store_raw);
715                    let bytes = serde_json::to_vec(&idx)
716                        .map_err(|e| crate::Error::Serialization(e.to_string()))?;
717                    (0u8, bytes) // 0 = RaBitQ
718                }
719            };
720
721            field_indexes.push((field_id, index_type, index_bytes));
722        }
723
724        if field_indexes.is_empty() {
725            return Ok(Vec::new());
726        }
727
728        // Sort by field_id for consistent ordering
729        field_indexes.sort_by_key(|(id, _, _)| *id);
730
731        // Calculate header size: num_fields + (field_id, index_type, offset, len) per field
732        let header_size = 4 + field_indexes.len() * (4 + 1 + 8 + 8);
733
734        // Build output
735        let mut output = Vec::new();
736
737        // Write number of fields
738        output.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
739
740        // Calculate offsets and write header entries
741        let mut current_offset = header_size as u64;
742        for (field_id, index_type, data) in &field_indexes {
743            output.write_u32::<LittleEndian>(*field_id)?;
744            output.write_u8(*index_type)?;
745            output.write_u64::<LittleEndian>(current_offset)?;
746            output.write_u64::<LittleEndian>(data.len() as u64)?;
747            current_offset += data.len() as u64;
748        }
749
750        // Write data
751        for (_, _, data) in field_indexes {
752            output.extend_from_slice(&data);
753        }
754
755        Ok(output)
756    }
757
758    /// Build postings from inverted index
759    ///
760    /// Uses parallel processing to serialize posting lists concurrently.
761    fn build_postings(&mut self) -> Result<(Vec<u8>, Vec<u8>)> {
762        // Phase 1: Collect and sort term keys (parallel key generation)
763        // Key format: field_id (4 bytes) + term bytes
764        let mut term_entries: Vec<(Vec<u8>, &PostingListBuilder)> = self
765            .inverted_index
766            .iter()
767            .map(|(term_key, posting_list)| {
768                let term_str = self.term_interner.resolve(&term_key.term);
769                let mut key = Vec::with_capacity(4 + term_str.len());
770                key.extend_from_slice(&term_key.field.to_le_bytes());
771                key.extend_from_slice(term_str.as_bytes());
772                (key, posting_list)
773            })
774            .collect();
775
776        // Sort by key for SSTable ordering
777        term_entries.par_sort_unstable_by(|a, b| a.0.cmp(&b.0));
778
779        // Phase 2: Parallel serialization of posting lists
780        // Each term's posting list is serialized independently
781        let serialized: Vec<(Vec<u8>, SerializedPosting)> = term_entries
782            .into_par_iter()
783            .map(|(key, posting_builder)| {
784                // Build posting list from in-memory postings
785                let mut full_postings = PostingList::with_capacity(posting_builder.len());
786                for p in &posting_builder.postings {
787                    full_postings.push(p.doc_id, p.term_freq as u32);
788                }
789
790                // Build term info
791                let doc_ids: Vec<u32> = full_postings.iter().map(|p| p.doc_id).collect();
792                let term_freqs: Vec<u32> = full_postings.iter().map(|p| p.term_freq).collect();
793
794                let result = if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
795                    SerializedPosting::Inline(inline)
796                } else {
797                    // Serialize to local buffer
798                    let mut posting_bytes = Vec::new();
799                    let block_list =
800                        crate::structures::BlockPostingList::from_posting_list(&full_postings)
801                            .expect("BlockPostingList creation failed");
802                    block_list
803                        .serialize(&mut posting_bytes)
804                        .expect("BlockPostingList serialization failed");
805                    SerializedPosting::External {
806                        bytes: posting_bytes,
807                        doc_count: full_postings.doc_count(),
808                    }
809                };
810
811                (key, result)
812            })
813            .collect();
814
815        // Phase 3: Sequential assembly (must be sequential for offset calculation)
816        let mut term_dict = Vec::new();
817        let mut postings = Vec::new();
818        let mut writer = SSTableWriter::<TermInfo>::new(&mut term_dict);
819
820        for (key, serialized_posting) in serialized {
821            let term_info = match serialized_posting {
822                SerializedPosting::Inline(info) => info,
823                SerializedPosting::External { bytes, doc_count } => {
824                    let posting_offset = postings.len() as u64;
825                    let posting_len = bytes.len() as u32;
826                    postings.extend_from_slice(&bytes);
827                    TermInfo::external(posting_offset, posting_len, doc_count)
828                }
829            };
830
831            writer.insert(&key, &term_info)?;
832        }
833
834        writer.finish()?;
835        Ok((term_dict, postings))
836    }
837
838    /// Build document store from streamed temp file (static method for parallel execution)
839    ///
840    /// Uses parallel processing to deserialize documents concurrently.
841    fn build_store_parallel(
842        store_path: &PathBuf,
843        schema: &Schema,
844        num_compression_threads: usize,
845        compression_level: CompressionLevel,
846    ) -> Result<Vec<u8>> {
847        use super::store::EagerParallelStoreWriter;
848
849        let file = File::open(store_path)?;
850        let mmap = unsafe { memmap2::Mmap::map(&file)? };
851
852        // Phase 1: Parse document boundaries (sequential, fast)
853        let mut doc_ranges: Vec<(usize, usize)> = Vec::new();
854        let mut offset = 0usize;
855        while offset + 4 <= mmap.len() {
856            let doc_len = u32::from_le_bytes([
857                mmap[offset],
858                mmap[offset + 1],
859                mmap[offset + 2],
860                mmap[offset + 3],
861            ]) as usize;
862            offset += 4;
863
864            if offset + doc_len > mmap.len() {
865                break;
866            }
867
868            doc_ranges.push((offset, doc_len));
869            offset += doc_len;
870        }
871
872        // Phase 2: Parallel deserialization of documents
873        let docs: Vec<Document> = doc_ranges
874            .into_par_iter()
875            .filter_map(|(start, len)| {
876                let doc_bytes = &mmap[start..start + len];
877                super::store::deserialize_document(doc_bytes, schema).ok()
878            })
879            .collect();
880
881        // Phase 3: Write to store (compression is already parallel in EagerParallelStoreWriter)
882        let mut store_data = Vec::new();
883        let mut store_writer = EagerParallelStoreWriter::with_compression_level(
884            &mut store_data,
885            num_compression_threads,
886            compression_level,
887        );
888
889        for doc in &docs {
890            store_writer.store(doc, schema)?;
891        }
892
893        store_writer.finish()?;
894        Ok(store_data)
895    }
896}
897
898impl Drop for SegmentBuilder {
899    fn drop(&mut self) {
900        // Cleanup temp files on drop
901        let _ = std::fs::remove_file(&self.store_path);
902    }
903}