Skip to main content

hermes_core/segment/builder/
mod.rs

1//! Streaming segment builder with optimized memory usage
2//!
3//! Key optimizations:
4//! - **String interning**: Terms are interned using `lasso` to avoid repeated allocations
5//! - **hashbrown HashMap**: O(1) average insertion instead of BTreeMap's O(log n)
6//! - **Streaming document store**: Documents written to disk immediately
7//! - **Incremental posting flush**: Large posting lists flushed to temp file
8//! - **Memory-mapped intermediate files**: Reduces memory pressure
9//! - **Arena allocation**: Batch allocations for reduced fragmentation
10
11mod config;
12mod posting;
13mod vectors;
14
15pub use config::{MemoryBreakdown, SegmentBuilderConfig, SegmentBuilderStats};
16
17use std::fs::{File, OpenOptions};
18use std::io::{BufWriter, Write};
19use std::path::PathBuf;
20
21use hashbrown::HashMap;
22use lasso::{Rodeo, Spur};
23use rayon::prelude::*;
24use rustc_hash::FxHashMap;
25
26use crate::compression::CompressionLevel;
27
28use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
29use crate::directories::{Directory, DirectoryWriter};
30use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
31use crate::structures::{PostingList, SSTableWriter, TermInfo};
32use crate::tokenizer::BoxedTokenizer;
33use crate::{DocId, Result};
34
35use posting::{
36    CompactPosting, PositionPostingListBuilder, PostingListBuilder, SerializedPosting, TermKey,
37};
38use vectors::{DenseVectorBuilder, SparseVectorBuilder};
39
40// Re-export from vector_data for backwards compatibility
41pub use super::vector_data::{FlatVectorData, IVFRaBitQIndexData, ScaNNIndexData};
42
43/// Size of the document store buffer before writing to disk
44const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; // 16MB
45
46/// Segment builder with optimized memory usage
47///
48/// Features:
49/// - Streams documents to disk immediately (no in-memory document storage)
50/// - Uses string interning for terms (reduced allocations)
51/// - Uses hashbrown HashMap (faster than BTreeMap)
52pub struct SegmentBuilder {
53    schema: Schema,
54    config: SegmentBuilderConfig,
55    tokenizers: FxHashMap<Field, BoxedTokenizer>,
56
57    /// String interner for terms - O(1) lookup and deduplication
58    term_interner: Rodeo,
59
60    /// Inverted index: term key -> posting list
61    inverted_index: HashMap<TermKey, PostingListBuilder>,
62
63    /// Streaming document store writer
64    store_file: BufWriter<File>,
65    store_path: PathBuf,
66
67    /// Document count
68    next_doc_id: DocId,
69
70    /// Per-field statistics for BM25F
71    field_stats: FxHashMap<u32, FieldStats>,
72
73    /// Per-document field lengths stored compactly
74    /// Uses a flat Vec instead of Vec<HashMap> for better cache locality
75    /// Layout: [doc0_field0_len, doc0_field1_len, ..., doc1_field0_len, ...]
76    doc_field_lengths: Vec<u32>,
77    num_indexed_fields: usize,
78    field_to_slot: FxHashMap<u32, usize>,
79
80    /// Reusable buffer for per-document term frequency aggregation
81    /// Avoids allocating a new hashmap for each document
82    local_tf_buffer: FxHashMap<Spur, u32>,
83
84    /// Reusable buffer for tokenization to avoid per-token String allocations
85    token_buffer: String,
86
87    /// Dense vector storage per field: field -> (doc_ids, vectors)
88    /// Vectors are stored as flat f32 arrays for efficient RaBitQ indexing
89    dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
90
91    /// Sparse vector storage per field: field -> SparseVectorBuilder
92    /// Uses proper BlockSparsePostingList with configurable quantization
93    sparse_vectors: FxHashMap<u32, SparseVectorBuilder>,
94
95    /// Position index for fields with positions enabled
96    /// term key -> position posting list
97    position_index: HashMap<TermKey, PositionPostingListBuilder>,
98
99    /// Fields that have position tracking enabled, with their mode
100    position_enabled_fields: FxHashMap<u32, Option<crate::dsl::PositionMode>>,
101
102    /// Current element ordinal for multi-valued fields (reset per document)
103    current_element_ordinal: FxHashMap<u32, u32>,
104
105    /// Incrementally tracked memory estimate (avoids expensive stats() calls)
106    estimated_memory: usize,
107}
108
109impl SegmentBuilder {
110    /// Create a new segment builder
111    pub fn new(schema: Schema, config: SegmentBuilderConfig) -> Result<Self> {
112        let segment_id = uuid::Uuid::new_v4();
113        let store_path = config
114            .temp_dir
115            .join(format!("hermes_store_{}.tmp", segment_id));
116
117        let store_file = BufWriter::with_capacity(
118            STORE_BUFFER_SIZE,
119            OpenOptions::new()
120                .create(true)
121                .write(true)
122                .truncate(true)
123                .open(&store_path)?,
124        );
125
126        // Count indexed fields for compact field length storage
127        // Also track which fields have position recording enabled
128        let mut num_indexed_fields = 0;
129        let mut field_to_slot = FxHashMap::default();
130        let mut position_enabled_fields = FxHashMap::default();
131        for (field, entry) in schema.fields() {
132            if entry.indexed && matches!(entry.field_type, FieldType::Text) {
133                field_to_slot.insert(field.0, num_indexed_fields);
134                num_indexed_fields += 1;
135                if entry.positions.is_some() {
136                    position_enabled_fields.insert(field.0, entry.positions);
137                }
138            }
139        }
140
141        Ok(Self {
142            schema,
143            tokenizers: FxHashMap::default(),
144            term_interner: Rodeo::new(),
145            inverted_index: HashMap::with_capacity(config.posting_map_capacity),
146            store_file,
147            store_path,
148            next_doc_id: 0,
149            field_stats: FxHashMap::default(),
150            doc_field_lengths: Vec::new(),
151            num_indexed_fields,
152            field_to_slot,
153            local_tf_buffer: FxHashMap::default(),
154            token_buffer: String::with_capacity(64),
155            config,
156            dense_vectors: FxHashMap::default(),
157            sparse_vectors: FxHashMap::default(),
158            position_index: HashMap::new(),
159            position_enabled_fields,
160            current_element_ordinal: FxHashMap::default(),
161            estimated_memory: 0,
162        })
163    }
164
165    pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
166        self.tokenizers.insert(field, tokenizer);
167    }
168
169    pub fn num_docs(&self) -> u32 {
170        self.next_doc_id
171    }
172
173    /// Fast O(1) memory estimate - updated incrementally during indexing
174    #[inline]
175    pub fn estimated_memory_bytes(&self) -> usize {
176        self.estimated_memory
177    }
178
179    /// Count total unique sparse dimensions across all fields
180    pub fn sparse_dim_count(&self) -> usize {
181        self.sparse_vectors.values().map(|b| b.postings.len()).sum()
182    }
183
184    /// Get current statistics for debugging performance (expensive - iterates all data)
185    pub fn stats(&self) -> SegmentBuilderStats {
186        use std::mem::size_of;
187
188        let postings_in_memory: usize =
189            self.inverted_index.values().map(|p| p.postings.len()).sum();
190
191        // Size constants computed from actual types
192        let compact_posting_size = size_of::<CompactPosting>();
193        let vec_overhead = size_of::<Vec<u8>>(); // Vec header: ptr + len + cap = 24 bytes on 64-bit
194        let term_key_size = size_of::<TermKey>();
195        let posting_builder_size = size_of::<PostingListBuilder>();
196        let spur_size = size_of::<lasso::Spur>();
197        let sparse_entry_size = size_of::<(DocId, u16, f32)>();
198
199        // hashbrown HashMap entry overhead: key + value + 1 byte control + padding
200        // Measured: ~(key_size + value_size + 8) per entry on average
201        let hashmap_entry_base_overhead = 8usize;
202
203        // FxHashMap uses same layout as hashbrown
204        let fxhashmap_entry_overhead = hashmap_entry_base_overhead;
205
206        // Postings memory
207        let postings_bytes: usize = self
208            .inverted_index
209            .values()
210            .map(|p| p.postings.capacity() * compact_posting_size + vec_overhead)
211            .sum();
212
213        // Inverted index overhead
214        let index_overhead_bytes = self.inverted_index.len()
215            * (term_key_size + posting_builder_size + hashmap_entry_base_overhead);
216
217        // Term interner: Rodeo stores strings + metadata
218        // Rodeo internal: string bytes + Spur + arena overhead (~2 pointers per string)
219        let interner_arena_overhead = 2 * size_of::<usize>();
220        let avg_term_len = 8; // Estimated average term length
221        let interner_bytes =
222            self.term_interner.len() * (avg_term_len + spur_size + interner_arena_overhead);
223
224        // Doc field lengths
225        let field_lengths_bytes =
226            self.doc_field_lengths.capacity() * size_of::<u32>() + vec_overhead;
227
228        // Dense vectors
229        let mut dense_vectors_bytes: usize = 0;
230        let mut dense_vector_count: usize = 0;
231        let doc_id_ordinal_size = size_of::<(DocId, u16)>();
232        for b in self.dense_vectors.values() {
233            dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
234                + b.doc_ids.capacity() * doc_id_ordinal_size
235                + 2 * vec_overhead; // Two Vecs
236            dense_vector_count += b.doc_ids.len();
237        }
238
239        // Local buffers
240        let local_tf_entry_size = spur_size + size_of::<u32>() + fxhashmap_entry_overhead;
241        let local_tf_buffer_bytes = self.local_tf_buffer.capacity() * local_tf_entry_size;
242
243        // Sparse vectors
244        let mut sparse_vectors_bytes: usize = 0;
245        for builder in self.sparse_vectors.values() {
246            for postings in builder.postings.values() {
247                sparse_vectors_bytes += postings.capacity() * sparse_entry_size + vec_overhead;
248            }
249            // Inner FxHashMap overhead: u32 key + Vec value ptr + overhead
250            let inner_entry_size = size_of::<u32>() + vec_overhead + fxhashmap_entry_overhead;
251            sparse_vectors_bytes += builder.postings.len() * inner_entry_size;
252        }
253        // Outer FxHashMap overhead
254        let outer_sparse_entry_size =
255            size_of::<u32>() + size_of::<SparseVectorBuilder>() + fxhashmap_entry_overhead;
256        sparse_vectors_bytes += self.sparse_vectors.len() * outer_sparse_entry_size;
257
258        // Position index
259        let mut position_index_bytes: usize = 0;
260        for pos_builder in self.position_index.values() {
261            for (_, positions) in &pos_builder.postings {
262                position_index_bytes += positions.capacity() * size_of::<u32>() + vec_overhead;
263            }
264            // Vec<(DocId, Vec<u32>)> entry size
265            let pos_entry_size = size_of::<DocId>() + vec_overhead;
266            position_index_bytes += pos_builder.postings.capacity() * pos_entry_size;
267        }
268        // HashMap overhead for position_index
269        let pos_index_entry_size =
270            term_key_size + size_of::<PositionPostingListBuilder>() + hashmap_entry_base_overhead;
271        position_index_bytes += self.position_index.len() * pos_index_entry_size;
272
273        let estimated_memory_bytes = postings_bytes
274            + index_overhead_bytes
275            + interner_bytes
276            + field_lengths_bytes
277            + dense_vectors_bytes
278            + local_tf_buffer_bytes
279            + sparse_vectors_bytes
280            + position_index_bytes;
281
282        let memory_breakdown = MemoryBreakdown {
283            postings_bytes,
284            index_overhead_bytes,
285            interner_bytes,
286            field_lengths_bytes,
287            dense_vectors_bytes,
288            dense_vector_count,
289            sparse_vectors_bytes,
290            position_index_bytes,
291        };
292
293        SegmentBuilderStats {
294            num_docs: self.next_doc_id,
295            unique_terms: self.inverted_index.len(),
296            postings_in_memory,
297            interned_strings: self.term_interner.len(),
298            doc_field_lengths_size: self.doc_field_lengths.len(),
299            estimated_memory_bytes,
300            memory_breakdown,
301        }
302    }
303
304    /// Add a document - streams to disk immediately
305    pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
306        let doc_id = self.next_doc_id;
307        self.next_doc_id += 1;
308
309        // Initialize field lengths for this document
310        let base_idx = self.doc_field_lengths.len();
311        self.doc_field_lengths
312            .resize(base_idx + self.num_indexed_fields, 0);
313
314        // Reset element ordinals for this document (for multi-valued fields)
315        self.current_element_ordinal.clear();
316
317        for (field, value) in doc.field_values() {
318            let entry = self.schema.get_field_entry(*field);
319            if entry.is_none() || !entry.unwrap().indexed {
320                continue;
321            }
322
323            let entry = entry.unwrap();
324            match (&entry.field_type, value) {
325                (FieldType::Text, FieldValue::Text(text)) => {
326                    // Get current element ordinal for multi-valued fields
327                    let element_ordinal = *self.current_element_ordinal.get(&field.0).unwrap_or(&0);
328                    let token_count =
329                        self.index_text_field(*field, doc_id, text, element_ordinal)?;
330                    // Increment element ordinal for next value of this field
331                    *self.current_element_ordinal.entry(field.0).or_insert(0) += 1;
332
333                    // Update field statistics
334                    let stats = self.field_stats.entry(field.0).or_default();
335                    stats.total_tokens += token_count as u64;
336                    stats.doc_count += 1;
337
338                    // Store field length compactly
339                    if let Some(&slot) = self.field_to_slot.get(&field.0) {
340                        self.doc_field_lengths[base_idx + slot] = token_count;
341                    }
342                }
343                (FieldType::U64, FieldValue::U64(v)) => {
344                    self.index_numeric_field(*field, doc_id, *v)?;
345                }
346                (FieldType::I64, FieldValue::I64(v)) => {
347                    self.index_numeric_field(*field, doc_id, *v as u64)?;
348                }
349                (FieldType::F64, FieldValue::F64(v)) => {
350                    self.index_numeric_field(*field, doc_id, v.to_bits())?;
351                }
352                (FieldType::DenseVector, FieldValue::DenseVector(vec)) => {
353                    // Get current element ordinal for multi-valued fields
354                    let element_ordinal = *self.current_element_ordinal.get(&field.0).unwrap_or(&0);
355                    self.index_dense_vector_field(*field, doc_id, element_ordinal as u16, vec)?;
356                    // Increment element ordinal for next value of this field
357                    *self.current_element_ordinal.entry(field.0).or_insert(0) += 1;
358                }
359                (FieldType::SparseVector, FieldValue::SparseVector(entries)) => {
360                    // Get current element ordinal for multi-valued fields
361                    let element_ordinal = *self.current_element_ordinal.get(&field.0).unwrap_or(&0);
362                    self.index_sparse_vector_field(
363                        *field,
364                        doc_id,
365                        element_ordinal as u16,
366                        entries,
367                    )?;
368                    // Increment element ordinal for next value of this field
369                    *self.current_element_ordinal.entry(field.0).or_insert(0) += 1;
370                }
371                _ => {}
372            }
373        }
374
375        // Stream document to disk immediately
376        self.write_document_to_store(&doc)?;
377
378        Ok(doc_id)
379    }
380
381    /// Index a text field using interned terms
382    ///
383    /// Optimization: Zero-allocation inline tokenization + term frequency aggregation.
384    /// Instead of allocating a String per token, we:
385    /// 1. Iterate over whitespace-split words
386    /// 2. Build lowercase token in a reusable buffer
387    /// 3. Intern directly from the buffer
388    ///
389    /// If position recording is enabled for this field, also records token positions
390    /// encoded as (element_ordinal << 20) | token_position.
391    fn index_text_field(
392        &mut self,
393        field: Field,
394        doc_id: DocId,
395        text: &str,
396        element_ordinal: u32,
397    ) -> Result<u32> {
398        use crate::dsl::PositionMode;
399
400        let field_id = field.0;
401        let position_mode = self
402            .position_enabled_fields
403            .get(&field_id)
404            .copied()
405            .flatten();
406
407        // Phase 1: Aggregate term frequencies within this document
408        // Also collect positions if enabled
409        // Reuse buffer to avoid allocations
410        self.local_tf_buffer.clear();
411
412        // For position tracking: term -> list of positions in this text
413        let mut local_positions: FxHashMap<Spur, Vec<u32>> = FxHashMap::default();
414
415        let mut token_position = 0u32;
416
417        // Zero-allocation tokenization: iterate words, lowercase inline, intern directly
418        for word in text.split_whitespace() {
419            // Build lowercase token in reusable buffer
420            self.token_buffer.clear();
421            for c in word.chars() {
422                if c.is_alphanumeric() {
423                    for lc in c.to_lowercase() {
424                        self.token_buffer.push(lc);
425                    }
426                }
427            }
428
429            if self.token_buffer.is_empty() {
430                continue;
431            }
432
433            // Intern the term directly from buffer - O(1) amortized
434            let term_spur = self.term_interner.get_or_intern(&self.token_buffer);
435            *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
436
437            // Record position based on mode
438            if let Some(mode) = position_mode {
439                let encoded_pos = match mode {
440                    // Ordinal only: just store element ordinal (token position = 0)
441                    PositionMode::Ordinal => element_ordinal << 20,
442                    // Token position only: just store token position (ordinal = 0)
443                    PositionMode::TokenPosition => token_position,
444                    // Full: encode both
445                    PositionMode::Full => (element_ordinal << 20) | token_position,
446                };
447                local_positions
448                    .entry(term_spur)
449                    .or_default()
450                    .push(encoded_pos);
451            }
452
453            token_position += 1;
454        }
455
456        // Phase 2: Insert aggregated terms into inverted index
457        // Now we only do one inverted_index lookup per unique term in doc
458        for (&term_spur, &tf) in &self.local_tf_buffer {
459            let term_key = TermKey {
460                field: field_id,
461                term: term_spur,
462            };
463
464            let is_new_term = !self.inverted_index.contains_key(&term_key);
465            let posting = self
466                .inverted_index
467                .entry(term_key)
468                .or_insert_with(PostingListBuilder::new);
469            posting.add(doc_id, tf);
470
471            // Incremental memory tracking
472            use std::mem::size_of;
473            self.estimated_memory += size_of::<CompactPosting>();
474            if is_new_term {
475                // HashMap entry overhead + PostingListBuilder + Vec header
476                self.estimated_memory +=
477                    size_of::<TermKey>() + size_of::<PostingListBuilder>() + 24;
478            }
479
480            // Add positions if enabled
481            if position_mode.is_some()
482                && let Some(positions) = local_positions.get(&term_spur)
483            {
484                let pos_posting = self
485                    .position_index
486                    .entry(term_key)
487                    .or_insert_with(PositionPostingListBuilder::new);
488                for &pos in positions {
489                    pos_posting.add_position(doc_id, pos);
490                }
491            }
492        }
493
494        Ok(token_position)
495    }
496
497    fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
498        // For numeric fields, we use a special encoding
499        let term_str = format!("__num_{}", value);
500        let term_spur = self.term_interner.get_or_intern(&term_str);
501
502        let term_key = TermKey {
503            field: field.0,
504            term: term_spur,
505        };
506
507        let posting = self
508            .inverted_index
509            .entry(term_key)
510            .or_insert_with(PostingListBuilder::new);
511        posting.add(doc_id, 1);
512
513        Ok(())
514    }
515
516    /// Index a dense vector field with ordinal tracking
517    fn index_dense_vector_field(
518        &mut self,
519        field: Field,
520        doc_id: DocId,
521        ordinal: u16,
522        vector: &[f32],
523    ) -> Result<()> {
524        let dim = vector.len();
525
526        let builder = self
527            .dense_vectors
528            .entry(field.0)
529            .or_insert_with(|| DenseVectorBuilder::new(dim));
530
531        // Verify dimension consistency
532        if builder.dim != dim && builder.len() > 0 {
533            return Err(crate::Error::Schema(format!(
534                "Dense vector dimension mismatch: expected {}, got {}",
535                builder.dim, dim
536            )));
537        }
538
539        builder.add(doc_id, ordinal, vector);
540
541        // Incremental memory tracking
542        use std::mem::{size_of, size_of_val};
543        self.estimated_memory += size_of_val(vector) + size_of::<(DocId, u16)>();
544
545        Ok(())
546    }
547
548    /// Index a sparse vector field using dedicated sparse posting lists
549    ///
550    /// Collects (doc_id, ordinal, weight) postings per dimension. During commit, these are
551    /// converted to BlockSparsePostingList with proper quantization from SparseVectorConfig.
552    ///
553    /// Weights below the configured `weight_threshold` are not indexed.
554    fn index_sparse_vector_field(
555        &mut self,
556        field: Field,
557        doc_id: DocId,
558        ordinal: u16,
559        entries: &[(u32, f32)],
560    ) -> Result<()> {
561        // Get weight threshold from field config (default 0.0 = no filtering)
562        let weight_threshold = self
563            .schema
564            .get_field_entry(field)
565            .and_then(|entry| entry.sparse_vector_config.as_ref())
566            .map(|config| config.weight_threshold)
567            .unwrap_or(0.0);
568
569        let builder = self
570            .sparse_vectors
571            .entry(field.0)
572            .or_insert_with(SparseVectorBuilder::new);
573
574        for &(dim_id, weight) in entries {
575            // Skip weights below threshold
576            if weight.abs() < weight_threshold {
577                continue;
578            }
579
580            // Incremental memory tracking
581            use std::mem::size_of;
582            let is_new_dim = !builder.postings.contains_key(&dim_id);
583            builder.add(dim_id, doc_id, ordinal, weight);
584            self.estimated_memory += size_of::<(DocId, u16, f32)>();
585            if is_new_dim {
586                // HashMap entry overhead + Vec header
587                self.estimated_memory += size_of::<u32>() + size_of::<Vec<(DocId, u16, f32)>>() + 8; // 8 = hashmap control byte + padding
588            }
589        }
590
591        Ok(())
592    }
593
594    /// Write document to streaming store
595    fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
596        use byteorder::{LittleEndian, WriteBytesExt};
597
598        let doc_bytes = super::store::serialize_document(doc, &self.schema)?;
599
600        self.store_file
601            .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
602        self.store_file.write_all(&doc_bytes)?;
603
604        Ok(())
605    }
606
607    /// Build the final segment
608    pub async fn build<D: Directory + DirectoryWriter>(
609        mut self,
610        dir: &D,
611        segment_id: SegmentId,
612    ) -> Result<SegmentMeta> {
613        // Flush any buffered data
614        self.store_file.flush()?;
615
616        let files = SegmentFiles::new(segment_id.0);
617
618        // Build positions FIRST to get offsets for TermInfo
619        let (positions_data, position_offsets) = self.build_positions_file()?;
620
621        // Extract data needed for parallel processing
622        let store_path = self.store_path.clone();
623        let schema = self.schema.clone();
624        let num_compression_threads = self.config.num_compression_threads;
625        let compression_level = self.config.compression_level;
626
627        // Build postings and document store in parallel
628        let (postings_result, store_result) = rayon::join(
629            || self.build_postings(&position_offsets),
630            || {
631                Self::build_store_parallel(
632                    &store_path,
633                    &schema,
634                    num_compression_threads,
635                    compression_level,
636                )
637            },
638        );
639
640        let (term_dict_data, postings_data) = postings_result?;
641        let store_data = store_result?;
642
643        // Write to directory
644        dir.write(&files.term_dict, &term_dict_data).await?;
645        dir.write(&files.postings, &postings_data).await?;
646        dir.write(&files.store, &store_data).await?;
647
648        // Write positions file (data only, offsets are in TermInfo)
649        if !positions_data.is_empty() {
650            dir.write(&files.positions, &positions_data).await?;
651        }
652
653        // Build and write dense vector indexes (RaBitQ) - all in one file
654        if !self.dense_vectors.is_empty() {
655            let vectors_data = self.build_vectors_file()?;
656            if !vectors_data.is_empty() {
657                dir.write(&files.vectors, &vectors_data).await?;
658            }
659        }
660
661        // Build and write sparse vector posting lists
662        if !self.sparse_vectors.is_empty() {
663            let sparse_data = self.build_sparse_file()?;
664            if !sparse_data.is_empty() {
665                dir.write(&files.sparse, &sparse_data).await?;
666            }
667        }
668
669        let meta = SegmentMeta {
670            id: segment_id.0,
671            num_docs: self.next_doc_id,
672            field_stats: self.field_stats.clone(),
673        };
674
675        dir.write(&files.meta, &meta.serialize()?).await?;
676
677        // Cleanup temp files
678        let _ = std::fs::remove_file(&self.store_path);
679
680        Ok(meta)
681    }
682
683    /// Build unified vectors file containing all dense vector indexes
684    ///
685    /// File format:
686    /// - Header: num_fields (u32)
687    /// - For each field: field_id (u32), index_type (u8), offset (u64), length (u64)
688    /// - Data: concatenated serialized indexes (RaBitQ, IVF-RaBitQ, or ScaNN)
689    fn build_vectors_file(&self) -> Result<Vec<u8>> {
690        use byteorder::{LittleEndian, WriteBytesExt};
691
692        // Build all indexes first: (field_id, index_type, data)
693        let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
694
695        for (&field_id, builder) in &self.dense_vectors {
696            if builder.len() == 0 {
697                continue;
698            }
699
700            let field = crate::dsl::Field(field_id);
701
702            // Get dense vector config
703            let dense_config = self
704                .schema
705                .get_field_entry(field)
706                .and_then(|e| e.dense_vector_config.as_ref());
707
708            // Get vectors, potentially trimmed for matryoshka/MRL indexing
709            let index_dim = dense_config.map(|c| c.index_dim()).unwrap_or(builder.dim);
710            let vectors = if index_dim < builder.dim {
711                // Trim vectors to mrl_dim for indexing
712                builder.get_vectors_trimmed(index_dim)
713            } else {
714                builder.get_vectors()
715            };
716
717            // During normal indexing, segments always store Flat (raw vectors).
718            // ANN indexes are built at index-level via build_vector_index() which
719            // trains centroids/codebooks once from all vectors and triggers rebuild.
720            let flat_data = FlatVectorData {
721                dim: index_dim,
722                vectors: vectors.clone(),
723                doc_ids: builder.doc_ids.clone(),
724            };
725            let index_bytes = serde_json::to_vec(&flat_data)
726                .map_err(|e| crate::Error::Serialization(e.to_string()))?;
727            let index_type = 3u8; // 3 = Flat
728
729            field_indexes.push((field_id, index_type, index_bytes));
730        }
731
732        if field_indexes.is_empty() {
733            return Ok(Vec::new());
734        }
735
736        // Sort by field_id for consistent ordering
737        field_indexes.sort_by_key(|(id, _, _)| *id);
738
739        // Calculate header size: num_fields + (field_id, index_type, offset, len) per field
740        let header_size = 4 + field_indexes.len() * (4 + 1 + 8 + 8);
741
742        // Build output
743        let mut output = Vec::new();
744
745        // Write number of fields
746        output.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
747
748        // Calculate offsets and write header entries
749        let mut current_offset = header_size as u64;
750        for (field_id, index_type, data) in &field_indexes {
751            output.write_u32::<LittleEndian>(*field_id)?;
752            output.write_u8(*index_type)?;
753            output.write_u64::<LittleEndian>(current_offset)?;
754            output.write_u64::<LittleEndian>(data.len() as u64)?;
755            current_offset += data.len() as u64;
756        }
757
758        // Write data
759        for (_, _, data) in field_indexes {
760            output.extend_from_slice(&data);
761        }
762
763        Ok(output)
764    }
765
766    /// Build sparse vectors file containing BlockSparsePostingList per field/dimension
767    ///
768    /// File format (compact - only active dimensions):
769    /// - Header: num_fields (u32)
770    /// - For each field:
771    ///   - field_id (u32)
772    ///   - quantization (u8)
773    ///   - num_dims (u32)            ← number of active dimensions
774    ///   - table: [(dim_id: u32, offset: u64, length: u32)] × num_dims
775    /// - Data: concatenated serialized BlockSparsePostingList
776    fn build_sparse_file(&self) -> Result<Vec<u8>> {
777        use crate::structures::{BlockSparsePostingList, WeightQuantization};
778        use byteorder::{LittleEndian, WriteBytesExt};
779
780        if self.sparse_vectors.is_empty() {
781            return Ok(Vec::new());
782        }
783
784        // Collect field data: (field_id, quantization, max_dim_id, dim_id -> serialized_bytes)
785        type SparseFieldData = (u32, WeightQuantization, u32, FxHashMap<u32, Vec<u8>>);
786        let mut field_data: Vec<SparseFieldData> = Vec::new();
787
788        for (&field_id, builder) in &self.sparse_vectors {
789            if builder.is_empty() {
790                continue;
791            }
792
793            let field = crate::dsl::Field(field_id);
794
795            // Get config from field
796            let sparse_config = self
797                .schema
798                .get_field_entry(field)
799                .and_then(|e| e.sparse_vector_config.as_ref());
800
801            let quantization = sparse_config
802                .map(|c| c.weight_quantization)
803                .unwrap_or(WeightQuantization::Float32);
804
805            let block_size = sparse_config.map(|c| c.block_size).unwrap_or(128);
806
807            // Find max dimension ID
808            let _max_dim_id = builder.postings.keys().max().copied().unwrap_or(0);
809
810            // Build BlockSparsePostingList for each dimension
811            let mut dim_bytes: FxHashMap<u32, Vec<u8>> = FxHashMap::default();
812
813            for (&dim_id, postings) in &builder.postings {
814                // Sort postings by doc_id, then by ordinal
815                let mut sorted_postings = postings.clone();
816                sorted_postings.sort_by_key(|(doc_id, ordinal, _)| (*doc_id, *ordinal));
817
818                // Build BlockSparsePostingList with configured block size
819                let block_list = BlockSparsePostingList::from_postings_with_block_size(
820                    &sorted_postings,
821                    quantization,
822                    block_size,
823                )
824                .map_err(crate::Error::Io)?;
825
826                // Serialize
827                let mut bytes = Vec::new();
828                block_list.serialize(&mut bytes).map_err(crate::Error::Io)?;
829
830                dim_bytes.insert(dim_id, bytes);
831            }
832
833            // Store num_dims (active count) instead of max_dim_id
834            field_data.push((field_id, quantization, dim_bytes.len() as u32, dim_bytes));
835        }
836
837        if field_data.is_empty() {
838            return Ok(Vec::new());
839        }
840
841        // Sort by field_id
842        field_data.sort_by_key(|(id, _, _, _)| *id);
843
844        // Calculate header size (compact format)
845        // Header: num_fields (4)
846        // Per field: field_id (4) + quant (1) + num_dims (4) + table (16 * num_dims)
847        let mut header_size = 4u64;
848        for (_, _, num_dims, _) in &field_data {
849            header_size += 4 + 1 + 4; // field_id + quant + num_dims
850            header_size += (*num_dims as u64) * 16; // table entries: (dim_id: u32, offset: u64, length: u32)
851        }
852
853        // Build output
854        let mut output = Vec::new();
855
856        // Write num_fields
857        output.write_u32::<LittleEndian>(field_data.len() as u32)?;
858
859        // Track current data offset (after all headers)
860        let mut current_offset = header_size;
861
862        // First, collect all data bytes in order and build compact offset tables
863        let mut all_data: Vec<u8> = Vec::new();
864        // (dim_id, offset, length) for each active dimension
865        let mut field_tables: Vec<Vec<(u32, u64, u32)>> = Vec::new();
866
867        for (_, _, _, dim_bytes) in &field_data {
868            let mut table: Vec<(u32, u64, u32)> = Vec::with_capacity(dim_bytes.len());
869
870            // Sort dimensions for deterministic output
871            let mut dims: Vec<_> = dim_bytes.keys().copied().collect();
872            dims.sort();
873
874            for dim_id in dims {
875                let bytes = &dim_bytes[&dim_id];
876                table.push((dim_id, current_offset, bytes.len() as u32));
877                current_offset += bytes.len() as u64;
878                all_data.extend_from_slice(bytes);
879            }
880
881            field_tables.push(table);
882        }
883
884        // Write field headers and compact tables
885        for (i, (field_id, quantization, num_dims, _)) in field_data.iter().enumerate() {
886            output.write_u32::<LittleEndian>(*field_id)?;
887            output.write_u8(*quantization as u8)?;
888            output.write_u32::<LittleEndian>(*num_dims)?;
889
890            // Write compact table: (dim_id, offset, length) only for active dims
891            for &(dim_id, offset, length) in &field_tables[i] {
892                output.write_u32::<LittleEndian>(dim_id)?;
893                output.write_u64::<LittleEndian>(offset)?;
894                output.write_u32::<LittleEndian>(length)?;
895            }
896        }
897
898        // Write data
899        output.extend_from_slice(&all_data);
900
901        Ok(output)
902    }
903
904    /// Build positions file for phrase queries
905    ///
906    /// File format:
907    /// - Data only: concatenated serialized PositionPostingList
908    /// - Position offsets are stored in TermInfo (no separate header needed)
909    ///
910    /// Returns: (positions_data, term_key -> (offset, len) mapping)
911    #[allow(clippy::type_complexity)]
912    fn build_positions_file(&self) -> Result<(Vec<u8>, FxHashMap<Vec<u8>, (u64, u32)>)> {
913        use crate::structures::PositionPostingList;
914
915        let mut position_offsets: FxHashMap<Vec<u8>, (u64, u32)> = FxHashMap::default();
916
917        if self.position_index.is_empty() {
918            return Ok((Vec::new(), position_offsets));
919        }
920
921        // Collect and sort entries by key
922        let mut entries: Vec<(Vec<u8>, &PositionPostingListBuilder)> = self
923            .position_index
924            .iter()
925            .map(|(term_key, pos_list)| {
926                let term_str = self.term_interner.resolve(&term_key.term);
927                let mut key = Vec::with_capacity(4 + term_str.len());
928                key.extend_from_slice(&term_key.field.to_le_bytes());
929                key.extend_from_slice(term_str.as_bytes());
930                (key, pos_list)
931            })
932            .collect();
933
934        entries.sort_by(|a, b| a.0.cmp(&b.0));
935
936        // Serialize all position lists and track offsets
937        let mut output = Vec::new();
938
939        for (key, pos_builder) in entries {
940            // Convert builder to PositionPostingList
941            let mut pos_list = PositionPostingList::with_capacity(pos_builder.postings.len());
942            for (doc_id, positions) in &pos_builder.postings {
943                pos_list.push(*doc_id, positions.clone());
944            }
945
946            // Serialize and track offset
947            let offset = output.len() as u64;
948            pos_list.serialize(&mut output).map_err(crate::Error::Io)?;
949            let len = (output.len() as u64 - offset) as u32;
950
951            position_offsets.insert(key, (offset, len));
952        }
953
954        Ok((output, position_offsets))
955    }
956
957    /// Build postings from inverted index
958    ///
959    /// Uses parallel processing to serialize posting lists concurrently.
960    /// Position offsets are looked up and embedded in TermInfo.
961    fn build_postings(
962        &mut self,
963        position_offsets: &FxHashMap<Vec<u8>, (u64, u32)>,
964    ) -> Result<(Vec<u8>, Vec<u8>)> {
965        // Phase 1: Collect and sort term keys (parallel key generation)
966        // Key format: field_id (4 bytes) + term bytes
967        let mut term_entries: Vec<(Vec<u8>, &PostingListBuilder)> = self
968            .inverted_index
969            .iter()
970            .map(|(term_key, posting_list)| {
971                let term_str = self.term_interner.resolve(&term_key.term);
972                let mut key = Vec::with_capacity(4 + term_str.len());
973                key.extend_from_slice(&term_key.field.to_le_bytes());
974                key.extend_from_slice(term_str.as_bytes());
975                (key, posting_list)
976            })
977            .collect();
978
979        // Sort by key for SSTable ordering
980        term_entries.par_sort_unstable_by(|a, b| a.0.cmp(&b.0));
981
982        // Phase 2: Parallel serialization of posting lists
983        // Each term's posting list is serialized independently
984        let serialized: Vec<(Vec<u8>, SerializedPosting)> = term_entries
985            .into_par_iter()
986            .map(|(key, posting_builder)| {
987                // Build posting list from in-memory postings
988                let mut full_postings = PostingList::with_capacity(posting_builder.len());
989                for p in &posting_builder.postings {
990                    full_postings.push(p.doc_id, p.term_freq as u32);
991                }
992
993                // Build term info
994                let doc_ids: Vec<u32> = full_postings.iter().map(|p| p.doc_id).collect();
995                let term_freqs: Vec<u32> = full_postings.iter().map(|p| p.term_freq).collect();
996
997                // Don't inline if term has positions (inline format doesn't support position offsets)
998                let has_positions = position_offsets.contains_key(&key);
999                let result = if !has_positions
1000                    && let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs)
1001                {
1002                    SerializedPosting::Inline(inline)
1003                } else {
1004                    // Serialize to local buffer
1005                    let mut posting_bytes = Vec::new();
1006                    let block_list =
1007                        crate::structures::BlockPostingList::from_posting_list(&full_postings)
1008                            .expect("BlockPostingList creation failed");
1009                    block_list
1010                        .serialize(&mut posting_bytes)
1011                        .expect("BlockPostingList serialization failed");
1012                    SerializedPosting::External {
1013                        bytes: posting_bytes,
1014                        doc_count: full_postings.doc_count(),
1015                    }
1016                };
1017
1018                (key, result)
1019            })
1020            .collect();
1021
1022        // Phase 3: Sequential assembly (must be sequential for offset calculation)
1023        let mut term_dict = Vec::new();
1024        let mut postings = Vec::new();
1025        let mut writer = SSTableWriter::<TermInfo>::new(&mut term_dict);
1026
1027        for (key, serialized_posting) in serialized {
1028            let term_info = match serialized_posting {
1029                SerializedPosting::Inline(info) => info,
1030                SerializedPosting::External { bytes, doc_count } => {
1031                    let posting_offset = postings.len() as u64;
1032                    let posting_len = bytes.len() as u32;
1033                    postings.extend_from_slice(&bytes);
1034
1035                    // Look up position offset for this term
1036                    if let Some(&(pos_offset, pos_len)) = position_offsets.get(&key) {
1037                        TermInfo::external_with_positions(
1038                            posting_offset,
1039                            posting_len,
1040                            doc_count,
1041                            pos_offset,
1042                            pos_len,
1043                        )
1044                    } else {
1045                        TermInfo::external(posting_offset, posting_len, doc_count)
1046                    }
1047                }
1048            };
1049
1050            writer.insert(&key, &term_info)?;
1051        }
1052
1053        writer.finish()?;
1054        Ok((term_dict, postings))
1055    }
1056
1057    /// Build document store from streamed temp file (static method for parallel execution)
1058    ///
1059    /// Uses parallel processing to deserialize documents concurrently.
1060    fn build_store_parallel(
1061        store_path: &PathBuf,
1062        schema: &Schema,
1063        num_compression_threads: usize,
1064        compression_level: CompressionLevel,
1065    ) -> Result<Vec<u8>> {
1066        use super::store::EagerParallelStoreWriter;
1067
1068        let file = File::open(store_path)?;
1069        let mmap = unsafe { memmap2::Mmap::map(&file)? };
1070
1071        // Phase 1: Parse document boundaries (sequential, fast)
1072        let mut doc_ranges: Vec<(usize, usize)> = Vec::new();
1073        let mut offset = 0usize;
1074        while offset + 4 <= mmap.len() {
1075            let doc_len = u32::from_le_bytes([
1076                mmap[offset],
1077                mmap[offset + 1],
1078                mmap[offset + 2],
1079                mmap[offset + 3],
1080            ]) as usize;
1081            offset += 4;
1082
1083            if offset + doc_len > mmap.len() {
1084                break;
1085            }
1086
1087            doc_ranges.push((offset, doc_len));
1088            offset += doc_len;
1089        }
1090
1091        // Phase 2: Parallel deserialization of documents
1092        let docs: Vec<Document> = doc_ranges
1093            .into_par_iter()
1094            .filter_map(|(start, len)| {
1095                let doc_bytes = &mmap[start..start + len];
1096                super::store::deserialize_document(doc_bytes, schema).ok()
1097            })
1098            .collect();
1099
1100        // Phase 3: Write to store (compression is already parallel in EagerParallelStoreWriter)
1101        let mut store_data = Vec::new();
1102        let mut store_writer = EagerParallelStoreWriter::with_compression_level(
1103            &mut store_data,
1104            num_compression_threads,
1105            compression_level,
1106        );
1107
1108        for doc in &docs {
1109            store_writer.store(doc, schema)?;
1110        }
1111
1112        store_writer.finish()?;
1113        Ok(store_data)
1114    }
1115}
1116
1117impl Drop for SegmentBuilder {
1118    fn drop(&mut self) {
1119        // Cleanup temp files on drop
1120        let _ = std::fs::remove_file(&self.store_path);
1121    }
1122}