hermes_core/segment/
builder.rs

1//! Streaming segment builder with optimized memory usage
2//!
3//! Key optimizations:
4//! - **String interning**: Terms are interned using `lasso` to avoid repeated allocations
5//! - **hashbrown HashMap**: O(1) average insertion instead of BTreeMap's O(log n)
6//! - **Streaming document store**: Documents written to disk immediately
7//! - **Incremental posting flush**: Large posting lists flushed to temp file
8//! - **Memory-mapped intermediate files**: Reduces memory pressure
9//! - **Arena allocation**: Batch allocations for reduced fragmentation
10
11use std::fs::{File, OpenOptions};
12use std::io::{BufWriter, Write};
13use std::path::PathBuf;
14use std::sync::Arc;
15
16use hashbrown::HashMap;
17use lasso::{Rodeo, Spur};
18use rayon::prelude::*;
19use rustc_hash::FxHashMap;
20
21use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
22use crate::compression::CompressionLevel;
23use crate::directories::{Directory, DirectoryWriter};
24use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
25use crate::structures::{PostingList, SSTableWriter, TermInfo};
26use crate::tokenizer::BoxedTokenizer;
27use crate::wand::WandData;
28use crate::{DocId, Result};
29
30/// Size of the document store buffer before writing to disk
31const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; // 16MB
32
33/// Interned term key combining field and term
34#[derive(Clone, Copy, PartialEq, Eq, Hash)]
35struct TermKey {
36    field: u32,
37    term: Spur,
38}
39
40/// Compact posting entry for in-memory storage
41#[derive(Clone, Copy)]
42struct CompactPosting {
43    doc_id: DocId,
44    term_freq: u16,
45}
46
47/// In-memory posting list for a term
48struct PostingListBuilder {
49    /// In-memory postings
50    postings: Vec<CompactPosting>,
51}
52
53impl PostingListBuilder {
54    fn new() -> Self {
55        Self {
56            postings: Vec::new(),
57        }
58    }
59
60    /// Add a posting, merging if same doc_id as last
61    #[inline]
62    fn add(&mut self, doc_id: DocId, term_freq: u32) {
63        // Check if we can merge with the last posting
64        if let Some(last) = self.postings.last_mut()
65            && last.doc_id == doc_id
66        {
67            last.term_freq = last.term_freq.saturating_add(term_freq as u16);
68            return;
69        }
70        self.postings.push(CompactPosting {
71            doc_id,
72            term_freq: term_freq.min(u16::MAX as u32) as u16,
73        });
74    }
75
76    fn len(&self) -> usize {
77        self.postings.len()
78    }
79}
80
81/// Intermediate result for parallel posting serialization
82enum SerializedPosting {
83    /// Inline posting (small enough to fit in TermInfo)
84    Inline(TermInfo),
85    /// External posting with serialized bytes
86    External { bytes: Vec<u8>, doc_count: u32 },
87}
88
89/// Statistics for debugging segment builder performance
90#[derive(Debug, Clone)]
91pub struct SegmentBuilderStats {
92    /// Number of documents indexed
93    pub num_docs: u32,
94    /// Number of unique terms in the inverted index
95    pub unique_terms: usize,
96    /// Total postings in memory (across all terms)
97    pub postings_in_memory: usize,
98    /// Number of interned strings
99    pub interned_strings: usize,
100    /// Size of doc_field_lengths vector
101    pub doc_field_lengths_size: usize,
102}
103
104/// Configuration for segment builder
105#[derive(Clone)]
106pub struct SegmentBuilderConfig {
107    /// Directory for temporary spill files
108    pub temp_dir: PathBuf,
109    /// Compression level for document store
110    pub compression_level: CompressionLevel,
111    /// Number of threads for parallel compression
112    pub num_compression_threads: usize,
113    /// Initial capacity for term interner
114    pub interner_capacity: usize,
115    /// Initial capacity for posting lists hashmap
116    pub posting_map_capacity: usize,
117}
118
119impl Default for SegmentBuilderConfig {
120    fn default() -> Self {
121        Self {
122            temp_dir: std::env::temp_dir(),
123            compression_level: CompressionLevel(7),
124            num_compression_threads: num_cpus::get(),
125            interner_capacity: 1_000_000,
126            posting_map_capacity: 500_000,
127        }
128    }
129}
130
131/// Segment builder with optimized memory usage
132///
133/// Features:
134/// - Streams documents to disk immediately (no in-memory document storage)
135/// - Uses string interning for terms (reduced allocations)
136/// - Uses hashbrown HashMap (faster than BTreeMap)
137pub struct SegmentBuilder {
138    schema: Schema,
139    config: SegmentBuilderConfig,
140    tokenizers: FxHashMap<Field, BoxedTokenizer>,
141
142    /// String interner for terms - O(1) lookup and deduplication
143    term_interner: Rodeo,
144
145    /// Inverted index: term key -> posting list
146    inverted_index: HashMap<TermKey, PostingListBuilder>,
147
148    /// Streaming document store writer
149    store_file: BufWriter<File>,
150    store_path: PathBuf,
151
152    /// Document count
153    next_doc_id: DocId,
154
155    /// Per-field statistics for BM25F
156    field_stats: FxHashMap<u32, FieldStats>,
157
158    /// Per-document field lengths stored compactly
159    /// Uses a flat Vec instead of Vec<HashMap> for better cache locality
160    /// Layout: [doc0_field0_len, doc0_field1_len, ..., doc1_field0_len, ...]
161    doc_field_lengths: Vec<u32>,
162    num_indexed_fields: usize,
163    field_to_slot: FxHashMap<u32, usize>,
164
165    /// Optional pre-computed WAND data for IDF values
166    wand_data: Option<Arc<WandData>>,
167
168    /// Reusable buffer for per-document term frequency aggregation
169    /// Avoids allocating a new hashmap for each document
170    local_tf_buffer: FxHashMap<Spur, u32>,
171
172    /// Reusable buffer for tokenization to avoid per-token String allocations
173    token_buffer: String,
174
175    /// Dense vector storage per field: field -> (doc_ids, vectors)
176    /// Vectors are stored as flat f32 arrays for efficient RaBitQ indexing
177    dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
178}
179
180/// Builder for dense vector index using RaBitQ
181struct DenseVectorBuilder {
182    /// Dimension of vectors
183    dim: usize,
184    /// Document IDs with vectors
185    doc_ids: Vec<DocId>,
186    /// Flat vector storage (doc_ids.len() * dim floats)
187    vectors: Vec<f32>,
188}
189
190impl DenseVectorBuilder {
191    fn new(dim: usize) -> Self {
192        Self {
193            dim,
194            doc_ids: Vec::new(),
195            vectors: Vec::new(),
196        }
197    }
198
199    fn add(&mut self, doc_id: DocId, vector: &[f32]) {
200        debug_assert_eq!(vector.len(), self.dim, "Vector dimension mismatch");
201        self.doc_ids.push(doc_id);
202        self.vectors.extend_from_slice(vector);
203    }
204
205    fn len(&self) -> usize {
206        self.doc_ids.len()
207    }
208
209    /// Get all vectors as Vec<Vec<f32>> for RaBitQ indexing
210    fn get_vectors(&self) -> Vec<Vec<f32>> {
211        self.doc_ids
212            .iter()
213            .enumerate()
214            .map(|(i, _)| {
215                let start = i * self.dim;
216                self.vectors[start..start + self.dim].to_vec()
217            })
218            .collect()
219    }
220}
221
222impl SegmentBuilder {
223    /// Create a new segment builder
224    pub fn new(schema: Schema, config: SegmentBuilderConfig) -> Result<Self> {
225        let segment_id = uuid::Uuid::new_v4();
226        let store_path = config
227            .temp_dir
228            .join(format!("hermes_store_{}.tmp", segment_id));
229
230        let store_file = BufWriter::with_capacity(
231            STORE_BUFFER_SIZE,
232            OpenOptions::new()
233                .create(true)
234                .write(true)
235                .truncate(true)
236                .open(&store_path)?,
237        );
238
239        // Count indexed fields for compact field length storage
240        let mut num_indexed_fields = 0;
241        let mut field_to_slot = FxHashMap::default();
242        for (field, entry) in schema.fields() {
243            if entry.indexed && matches!(entry.field_type, FieldType::Text) {
244                field_to_slot.insert(field.0, num_indexed_fields);
245                num_indexed_fields += 1;
246            }
247        }
248
249        Ok(Self {
250            schema,
251            tokenizers: FxHashMap::default(),
252            term_interner: Rodeo::new(),
253            inverted_index: HashMap::with_capacity(config.posting_map_capacity),
254            store_file,
255            store_path,
256            next_doc_id: 0,
257            field_stats: FxHashMap::default(),
258            doc_field_lengths: Vec::new(),
259            num_indexed_fields,
260            field_to_slot,
261            wand_data: None,
262            local_tf_buffer: FxHashMap::default(),
263            token_buffer: String::with_capacity(64),
264            config,
265            dense_vectors: FxHashMap::default(),
266        })
267    }
268
269    /// Create with pre-computed WAND data
270    pub fn with_wand_data(
271        schema: Schema,
272        config: SegmentBuilderConfig,
273        wand_data: Arc<WandData>,
274    ) -> Result<Self> {
275        let mut builder = Self::new(schema, config)?;
276        builder.wand_data = Some(wand_data);
277        Ok(builder)
278    }
279
280    pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
281        self.tokenizers.insert(field, tokenizer);
282    }
283
284    pub fn num_docs(&self) -> u32 {
285        self.next_doc_id
286    }
287
288    /// Get current statistics for debugging performance
289    pub fn stats(&self) -> SegmentBuilderStats {
290        let postings_in_memory: usize =
291            self.inverted_index.values().map(|p| p.postings.len()).sum();
292        SegmentBuilderStats {
293            num_docs: self.next_doc_id,
294            unique_terms: self.inverted_index.len(),
295            postings_in_memory,
296            interned_strings: self.term_interner.len(),
297            doc_field_lengths_size: self.doc_field_lengths.len(),
298        }
299    }
300
301    /// Add a document - streams to disk immediately
302    pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
303        let doc_id = self.next_doc_id;
304        self.next_doc_id += 1;
305
306        // Initialize field lengths for this document
307        let base_idx = self.doc_field_lengths.len();
308        self.doc_field_lengths
309            .resize(base_idx + self.num_indexed_fields, 0);
310
311        for (field, value) in doc.field_values() {
312            let entry = self.schema.get_field_entry(*field);
313            if entry.is_none() || !entry.unwrap().indexed {
314                continue;
315            }
316
317            let entry = entry.unwrap();
318            match (&entry.field_type, value) {
319                (FieldType::Text, FieldValue::Text(text)) => {
320                    let token_count = self.index_text_field(*field, doc_id, text)?;
321
322                    // Update field statistics
323                    let stats = self.field_stats.entry(field.0).or_default();
324                    stats.total_tokens += token_count as u64;
325                    stats.doc_count += 1;
326
327                    // Store field length compactly
328                    if let Some(&slot) = self.field_to_slot.get(&field.0) {
329                        self.doc_field_lengths[base_idx + slot] = token_count;
330                    }
331                }
332                (FieldType::U64, FieldValue::U64(v)) => {
333                    self.index_numeric_field(*field, doc_id, *v)?;
334                }
335                (FieldType::I64, FieldValue::I64(v)) => {
336                    self.index_numeric_field(*field, doc_id, *v as u64)?;
337                }
338                (FieldType::F64, FieldValue::F64(v)) => {
339                    self.index_numeric_field(*field, doc_id, v.to_bits())?;
340                }
341                (FieldType::DenseVector, FieldValue::DenseVector(vec)) => {
342                    self.index_dense_vector_field(*field, doc_id, vec)?;
343                }
344                (FieldType::SparseVector, FieldValue::SparseVector { indices, values }) => {
345                    self.index_sparse_vector_field(*field, doc_id, indices, values)?;
346                }
347                _ => {}
348            }
349        }
350
351        // Stream document to disk immediately
352        self.write_document_to_store(&doc)?;
353
354        Ok(doc_id)
355    }
356
357    /// Index a text field using interned terms
358    ///
359    /// Optimization: Zero-allocation inline tokenization + term frequency aggregation.
360    /// Instead of allocating a String per token, we:
361    /// 1. Iterate over whitespace-split words
362    /// 2. Build lowercase token in a reusable buffer
363    /// 3. Intern directly from the buffer
364    fn index_text_field(&mut self, field: Field, doc_id: DocId, text: &str) -> Result<u32> {
365        // Phase 1: Aggregate term frequencies within this document
366        // Reuse buffer to avoid allocations
367        self.local_tf_buffer.clear();
368
369        let mut token_count = 0u32;
370
371        // Zero-allocation tokenization: iterate words, lowercase inline, intern directly
372        for word in text.split_whitespace() {
373            // Build lowercase token in reusable buffer
374            self.token_buffer.clear();
375            for c in word.chars() {
376                if c.is_alphanumeric() {
377                    for lc in c.to_lowercase() {
378                        self.token_buffer.push(lc);
379                    }
380                }
381            }
382
383            if self.token_buffer.is_empty() {
384                continue;
385            }
386
387            token_count += 1;
388
389            // Intern the term directly from buffer - O(1) amortized
390            let term_spur = self.term_interner.get_or_intern(&self.token_buffer);
391            *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
392        }
393
394        // Phase 2: Insert aggregated terms into inverted index
395        // Now we only do one inverted_index lookup per unique term in doc
396        let field_id = field.0;
397
398        for (&term_spur, &tf) in &self.local_tf_buffer {
399            let term_key = TermKey {
400                field: field_id,
401                term: term_spur,
402            };
403
404            let posting = self
405                .inverted_index
406                .entry(term_key)
407                .or_insert_with(PostingListBuilder::new);
408            posting.add(doc_id, tf);
409        }
410
411        Ok(token_count)
412    }
413
414    fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
415        // For numeric fields, we use a special encoding
416        let term_str = format!("__num_{}", value);
417        let term_spur = self.term_interner.get_or_intern(&term_str);
418
419        let term_key = TermKey {
420            field: field.0,
421            term: term_spur,
422        };
423
424        let posting = self
425            .inverted_index
426            .entry(term_key)
427            .or_insert_with(PostingListBuilder::new);
428        posting.add(doc_id, 1);
429
430        Ok(())
431    }
432
433    /// Index a dense vector field
434    fn index_dense_vector_field(
435        &mut self,
436        field: Field,
437        doc_id: DocId,
438        vector: &[f32],
439    ) -> Result<()> {
440        let dim = vector.len();
441
442        let builder = self
443            .dense_vectors
444            .entry(field.0)
445            .or_insert_with(|| DenseVectorBuilder::new(dim));
446
447        // Verify dimension consistency
448        if builder.dim != dim && builder.len() > 0 {
449            return Err(crate::Error::Schema(format!(
450                "Dense vector dimension mismatch: expected {}, got {}",
451                builder.dim, dim
452            )));
453        }
454
455        builder.add(doc_id, vector);
456        Ok(())
457    }
458
459    /// Index a sparse vector field using the inverted index
460    ///
461    /// Each dimension becomes a term (prefixed with __sparse_), and the weight
462    /// is stored as a quantized term frequency. This allows reusing the existing
463    /// posting list infrastructure for sparse vector retrieval.
464    fn index_sparse_vector_field(
465        &mut self,
466        field: Field,
467        doc_id: DocId,
468        indices: &[u32],
469        values: &[f32],
470    ) -> Result<()> {
471        for (&dim_id, &weight) in indices.iter().zip(values.iter()) {
472            // Use a special prefix to distinguish sparse vector dimensions from text terms
473            let term_str = format!("__sparse_{}", dim_id);
474            let term_spur = self.term_interner.get_or_intern(&term_str);
475
476            let term_key = TermKey {
477                field: field.0,
478                term: term_spur,
479            };
480
481            // Quantize weight to term frequency (scale by 1000 for precision)
482            // This is a simple approach; more sophisticated quantization can be added
483            let quantized_weight = (weight.abs() * 1000.0).min(u16::MAX as f32) as u32;
484
485            let posting = self
486                .inverted_index
487                .entry(term_key)
488                .or_insert_with(PostingListBuilder::new);
489            posting.add(doc_id, quantized_weight.max(1));
490        }
491
492        Ok(())
493    }
494
495    /// Write document to streaming store
496    fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
497        use byteorder::{LittleEndian, WriteBytesExt};
498
499        let doc_bytes = super::store::serialize_document(doc, &self.schema)?;
500
501        self.store_file
502            .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
503        self.store_file.write_all(&doc_bytes)?;
504
505        Ok(())
506    }
507
508    /// Build the final segment
509    pub async fn build<D: Directory + DirectoryWriter>(
510        mut self,
511        dir: &D,
512        segment_id: SegmentId,
513    ) -> Result<SegmentMeta> {
514        // Flush any buffered data
515        self.store_file.flush()?;
516
517        let files = SegmentFiles::new(segment_id.0);
518
519        // Extract data needed for parallel processing
520        let store_path = self.store_path.clone();
521        let schema = self.schema.clone();
522        let num_compression_threads = self.config.num_compression_threads;
523        let compression_level = self.config.compression_level;
524
525        // Build postings and document store in parallel
526        let (postings_result, store_result) = rayon::join(
527            || self.build_postings(),
528            || {
529                Self::build_store_parallel(
530                    &store_path,
531                    &schema,
532                    num_compression_threads,
533                    compression_level,
534                )
535            },
536        );
537
538        let (term_dict_data, postings_data) = postings_result?;
539        let store_data = store_result?;
540
541        // Write to directory
542        dir.write(&files.term_dict, &term_dict_data).await?;
543        dir.write(&files.postings, &postings_data).await?;
544        dir.write(&files.store, &store_data).await?;
545
546        // Build and write dense vector indexes (RaBitQ) - all in one file
547        if !self.dense_vectors.is_empty() {
548            let vectors_data = self.build_vectors_file()?;
549            if !vectors_data.is_empty() {
550                dir.write(&files.vectors, &vectors_data).await?;
551            }
552        }
553
554        let meta = SegmentMeta {
555            id: segment_id.0,
556            num_docs: self.next_doc_id,
557            field_stats: self.field_stats.clone(),
558        };
559
560        dir.write(&files.meta, &meta.serialize()?).await?;
561
562        // Cleanup temp files
563        let _ = std::fs::remove_file(&self.store_path);
564
565        Ok(meta)
566    }
567
568    /// Build unified vectors file containing all dense vector indexes
569    ///
570    /// File format:
571    /// - Header: num_fields (u32)
572    /// - For each field: field_id (u32), offset (u64), length (u64)
573    /// - Data: concatenated serialized RaBitQ indexes
574    fn build_vectors_file(&self) -> Result<Vec<u8>> {
575        use byteorder::{LittleEndian, WriteBytesExt};
576
577        // Build all indexes first
578        let mut field_indexes: Vec<(u32, Vec<u8>)> = Vec::new();
579
580        for (&field_id, builder) in &self.dense_vectors {
581            if builder.len() == 0 {
582                continue;
583            }
584
585            let vectors = builder.get_vectors();
586            let field = crate::dsl::Field(field_id);
587
588            // Check if field has IVF config
589            let ivf_config = self
590                .schema
591                .get_field_entry(field)
592                .and_then(|e| e.dense_vector_config.as_ref())
593                .filter(|c| c.uses_ivf());
594
595            let index_bytes = if let Some(dense_config) = ivf_config {
596                // Try to load coarse centroids for IVF-RaBitQ
597                let centroids_path = dense_config.coarse_centroids_path.as_ref().unwrap();
598                match crate::structures::CoarseCentroids::load(std::path::Path::new(centroids_path))
599                {
600                    Ok(coarse_centroids) => {
601                        let ivf_cfg = crate::structures::IVFConfig::new(builder.dim);
602                        let doc_ids: Vec<u32> = builder.doc_ids.clone();
603                        let ivf_index = crate::structures::IVFRaBitQIndex::build(
604                            ivf_cfg,
605                            &coarse_centroids,
606                            &vectors,
607                            Some(&doc_ids),
608                        );
609                        serde_json::to_vec(&ivf_index)
610                            .map_err(|e| crate::Error::Serialization(e.to_string()))?
611                    }
612                    Err(e) => {
613                        log::warn!("Failed to load centroids: {}, using RaBitQ", e);
614                        let cfg = crate::structures::RaBitQConfig::new(builder.dim);
615                        let idx = crate::structures::RaBitQIndex::build(cfg, &vectors, true);
616                        serde_json::to_vec(&idx)
617                            .map_err(|e| crate::Error::Serialization(e.to_string()))?
618                    }
619                }
620            } else {
621                // Regular RaBitQ
622                let cfg = crate::structures::RaBitQConfig::new(builder.dim);
623                let idx = crate::structures::RaBitQIndex::build(cfg, &vectors, true);
624                serde_json::to_vec(&idx).map_err(|e| crate::Error::Serialization(e.to_string()))?
625            };
626
627            field_indexes.push((field_id, index_bytes));
628        }
629
630        if field_indexes.is_empty() {
631            return Ok(Vec::new());
632        }
633
634        // Sort by field_id for consistent ordering
635        field_indexes.sort_by_key(|(id, _)| *id);
636
637        // Calculate header size: num_fields + (field_id, offset, len) per field
638        let header_size = 4 + field_indexes.len() * (4 + 8 + 8);
639
640        // Build output
641        let mut output = Vec::new();
642
643        // Write number of fields
644        output.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
645
646        // Calculate offsets and write header entries
647        let mut current_offset = header_size as u64;
648        for (field_id, data) in &field_indexes {
649            output.write_u32::<LittleEndian>(*field_id)?;
650            output.write_u64::<LittleEndian>(current_offset)?;
651            output.write_u64::<LittleEndian>(data.len() as u64)?;
652            current_offset += data.len() as u64;
653        }
654
655        // Write data
656        for (_, data) in field_indexes {
657            output.extend_from_slice(&data);
658        }
659
660        Ok(output)
661    }
662
663    /// Build postings from inverted index
664    ///
665    /// Uses parallel processing to serialize posting lists concurrently.
666    fn build_postings(&mut self) -> Result<(Vec<u8>, Vec<u8>)> {
667        // Phase 1: Collect and sort term keys (parallel key generation)
668        // Key format: field_id (4 bytes) + term bytes
669        let mut term_entries: Vec<(Vec<u8>, &PostingListBuilder)> = self
670            .inverted_index
671            .iter()
672            .map(|(term_key, posting_list)| {
673                let term_str = self.term_interner.resolve(&term_key.term);
674                let mut key = Vec::with_capacity(4 + term_str.len());
675                key.extend_from_slice(&term_key.field.to_le_bytes());
676                key.extend_from_slice(term_str.as_bytes());
677                (key, posting_list)
678            })
679            .collect();
680
681        // Sort by key for SSTable ordering
682        term_entries.par_sort_unstable_by(|a, b| a.0.cmp(&b.0));
683
684        // Phase 2: Parallel serialization of posting lists
685        // Each term's posting list is serialized independently
686        let serialized: Vec<(Vec<u8>, SerializedPosting)> = term_entries
687            .into_par_iter()
688            .map(|(key, posting_builder)| {
689                // Build posting list from in-memory postings
690                let mut full_postings = PostingList::with_capacity(posting_builder.len());
691                for p in &posting_builder.postings {
692                    full_postings.push(p.doc_id, p.term_freq as u32);
693                }
694
695                // Build term info
696                let doc_ids: Vec<u32> = full_postings.iter().map(|p| p.doc_id).collect();
697                let term_freqs: Vec<u32> = full_postings.iter().map(|p| p.term_freq).collect();
698
699                let result = if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
700                    SerializedPosting::Inline(inline)
701                } else {
702                    // Serialize to local buffer
703                    let mut posting_bytes = Vec::new();
704                    let block_list =
705                        crate::structures::BlockPostingList::from_posting_list(&full_postings)
706                            .expect("BlockPostingList creation failed");
707                    block_list
708                        .serialize(&mut posting_bytes)
709                        .expect("BlockPostingList serialization failed");
710                    SerializedPosting::External {
711                        bytes: posting_bytes,
712                        doc_count: full_postings.doc_count(),
713                    }
714                };
715
716                (key, result)
717            })
718            .collect();
719
720        // Phase 3: Sequential assembly (must be sequential for offset calculation)
721        let mut term_dict = Vec::new();
722        let mut postings = Vec::new();
723        let mut writer = SSTableWriter::<TermInfo>::new(&mut term_dict);
724
725        for (key, serialized_posting) in serialized {
726            let term_info = match serialized_posting {
727                SerializedPosting::Inline(info) => info,
728                SerializedPosting::External { bytes, doc_count } => {
729                    let posting_offset = postings.len() as u64;
730                    let posting_len = bytes.len() as u32;
731                    postings.extend_from_slice(&bytes);
732                    TermInfo::external(posting_offset, posting_len, doc_count)
733                }
734            };
735
736            writer.insert(&key, &term_info)?;
737        }
738
739        writer.finish()?;
740        Ok((term_dict, postings))
741    }
742
743    /// Build document store from streamed temp file (static method for parallel execution)
744    ///
745    /// Uses parallel processing to deserialize documents concurrently.
746    fn build_store_parallel(
747        store_path: &PathBuf,
748        schema: &Schema,
749        num_compression_threads: usize,
750        compression_level: CompressionLevel,
751    ) -> Result<Vec<u8>> {
752        use super::store::EagerParallelStoreWriter;
753
754        let file = File::open(store_path)?;
755        let mmap = unsafe { memmap2::Mmap::map(&file)? };
756
757        // Phase 1: Parse document boundaries (sequential, fast)
758        let mut doc_ranges: Vec<(usize, usize)> = Vec::new();
759        let mut offset = 0usize;
760        while offset + 4 <= mmap.len() {
761            let doc_len = u32::from_le_bytes([
762                mmap[offset],
763                mmap[offset + 1],
764                mmap[offset + 2],
765                mmap[offset + 3],
766            ]) as usize;
767            offset += 4;
768
769            if offset + doc_len > mmap.len() {
770                break;
771            }
772
773            doc_ranges.push((offset, doc_len));
774            offset += doc_len;
775        }
776
777        // Phase 2: Parallel deserialization of documents
778        let docs: Vec<Document> = doc_ranges
779            .into_par_iter()
780            .filter_map(|(start, len)| {
781                let doc_bytes = &mmap[start..start + len];
782                super::store::deserialize_document(doc_bytes, schema).ok()
783            })
784            .collect();
785
786        // Phase 3: Write to store (compression is already parallel in EagerParallelStoreWriter)
787        let mut store_data = Vec::new();
788        let mut store_writer = EagerParallelStoreWriter::with_compression_level(
789            &mut store_data,
790            num_compression_threads,
791            compression_level,
792        );
793
794        for doc in &docs {
795            store_writer.store(doc, schema)?;
796        }
797
798        store_writer.finish()?;
799        Ok(store_data)
800    }
801}
802
803impl Drop for SegmentBuilder {
804    fn drop(&mut self) {
805        // Cleanup temp files on drop
806        let _ = std::fs::remove_file(&self.store_path);
807    }
808}