Skip to main content

hermes_core/segment/builder/
mod.rs

1//! Streaming segment builder with optimized memory usage
2//!
3//! Key optimizations:
4//! - **String interning**: Terms are interned using `lasso` to avoid repeated allocations
5//! - **hashbrown HashMap**: O(1) average insertion instead of BTreeMap's O(log n)
6//! - **Streaming document store**: Documents written to disk immediately
7//! - **Incremental posting flush**: Large posting lists flushed to temp file
8//! - **Memory-mapped intermediate files**: Reduces memory pressure
9//! - **Arena allocation**: Batch allocations for reduced fragmentation
10
11mod config;
12mod posting;
13mod vectors;
14
15pub use config::{MemoryBreakdown, SegmentBuilderConfig, SegmentBuilderStats};
16
17use std::fs::{File, OpenOptions};
18use std::io::{BufWriter, Write};
19use std::path::PathBuf;
20
21use hashbrown::HashMap;
22use lasso::{Rodeo, Spur};
23use rayon::prelude::*;
24use rustc_hash::FxHashMap;
25
26use crate::compression::CompressionLevel;
27
28use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
29use crate::directories::{Directory, DirectoryWriter};
30use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
31use crate::structures::{PostingList, SSTableWriter, TermInfo};
32use crate::tokenizer::BoxedTokenizer;
33use crate::{DocId, Result};
34
35use posting::{
36    CompactPosting, PositionPostingListBuilder, PostingListBuilder, SerializedPosting, TermKey,
37};
38use vectors::{DenseVectorBuilder, SparseVectorBuilder};
39
40// Re-export from vector_data for backwards compatibility
41pub use super::vector_data::{FlatVectorData, IVFRaBitQIndexData, ScaNNIndexData};
42
43/// Size of the document store buffer before writing to disk
44const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; // 16MB
45
46/// Segment builder with optimized memory usage
47///
48/// Features:
49/// - Streams documents to disk immediately (no in-memory document storage)
50/// - Uses string interning for terms (reduced allocations)
51/// - Uses hashbrown HashMap (faster than BTreeMap)
52pub struct SegmentBuilder {
53    schema: Schema,
54    config: SegmentBuilderConfig,
55    tokenizers: FxHashMap<Field, BoxedTokenizer>,
56
57    /// String interner for terms - O(1) lookup and deduplication
58    term_interner: Rodeo,
59
60    /// Inverted index: term key -> posting list
61    inverted_index: HashMap<TermKey, PostingListBuilder>,
62
63    /// Streaming document store writer
64    store_file: BufWriter<File>,
65    store_path: PathBuf,
66
67    /// Document count
68    next_doc_id: DocId,
69
70    /// Per-field statistics for BM25F
71    field_stats: FxHashMap<u32, FieldStats>,
72
73    /// Per-document field lengths stored compactly
74    /// Uses a flat Vec instead of Vec<HashMap> for better cache locality
75    /// Layout: [doc0_field0_len, doc0_field1_len, ..., doc1_field0_len, ...]
76    doc_field_lengths: Vec<u32>,
77    num_indexed_fields: usize,
78    field_to_slot: FxHashMap<u32, usize>,
79
80    /// Reusable buffer for per-document term frequency aggregation
81    /// Avoids allocating a new hashmap for each document
82    local_tf_buffer: FxHashMap<Spur, u32>,
83
84    /// Reusable buffer for tokenization to avoid per-token String allocations
85    token_buffer: String,
86
87    /// Dense vector storage per field: field -> (doc_ids, vectors)
88    /// Vectors are stored as flat f32 arrays for efficient RaBitQ indexing
89    dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
90
91    /// Sparse vector storage per field: field -> SparseVectorBuilder
92    /// Uses proper BlockSparsePostingList with configurable quantization
93    sparse_vectors: FxHashMap<u32, SparseVectorBuilder>,
94
95    /// Position index for fields with positions enabled
96    /// term key -> position posting list
97    position_index: HashMap<TermKey, PositionPostingListBuilder>,
98
99    /// Fields that have position tracking enabled, with their mode
100    position_enabled_fields: FxHashMap<u32, Option<crate::dsl::PositionMode>>,
101
102    /// Current element ordinal for multi-valued fields (reset per document)
103    current_element_ordinal: FxHashMap<u32, u32>,
104
105    /// Incrementally tracked memory estimate (avoids expensive stats() calls)
106    estimated_memory: usize,
107}
108
109impl SegmentBuilder {
110    /// Create a new segment builder
111    pub fn new(schema: Schema, config: SegmentBuilderConfig) -> Result<Self> {
112        let segment_id = uuid::Uuid::new_v4();
113        let store_path = config
114            .temp_dir
115            .join(format!("hermes_store_{}.tmp", segment_id));
116
117        let store_file = BufWriter::with_capacity(
118            STORE_BUFFER_SIZE,
119            OpenOptions::new()
120                .create(true)
121                .write(true)
122                .truncate(true)
123                .open(&store_path)?,
124        );
125
126        // Count indexed fields for compact field length storage
127        // Also track which fields have position recording enabled
128        let mut num_indexed_fields = 0;
129        let mut field_to_slot = FxHashMap::default();
130        let mut position_enabled_fields = FxHashMap::default();
131        for (field, entry) in schema.fields() {
132            if entry.indexed && matches!(entry.field_type, FieldType::Text) {
133                field_to_slot.insert(field.0, num_indexed_fields);
134                num_indexed_fields += 1;
135                if entry.positions.is_some() {
136                    position_enabled_fields.insert(field.0, entry.positions);
137                }
138            }
139        }
140
141        Ok(Self {
142            schema,
143            tokenizers: FxHashMap::default(),
144            term_interner: Rodeo::new(),
145            inverted_index: HashMap::with_capacity(config.posting_map_capacity),
146            store_file,
147            store_path,
148            next_doc_id: 0,
149            field_stats: FxHashMap::default(),
150            doc_field_lengths: Vec::new(),
151            num_indexed_fields,
152            field_to_slot,
153            local_tf_buffer: FxHashMap::default(),
154            token_buffer: String::with_capacity(64),
155            config,
156            dense_vectors: FxHashMap::default(),
157            sparse_vectors: FxHashMap::default(),
158            position_index: HashMap::new(),
159            position_enabled_fields,
160            current_element_ordinal: FxHashMap::default(),
161            estimated_memory: 0,
162        })
163    }
164
165    pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
166        self.tokenizers.insert(field, tokenizer);
167    }
168
169    pub fn num_docs(&self) -> u32 {
170        self.next_doc_id
171    }
172
173    /// Fast O(1) memory estimate - updated incrementally during indexing
174    #[inline]
175    pub fn estimated_memory_bytes(&self) -> usize {
176        self.estimated_memory
177    }
178
179    /// Get current statistics for debugging performance (expensive - iterates all data)
180    pub fn stats(&self) -> SegmentBuilderStats {
181        use std::mem::size_of;
182
183        let postings_in_memory: usize =
184            self.inverted_index.values().map(|p| p.postings.len()).sum();
185
186        // Size constants computed from actual types
187        let compact_posting_size = size_of::<CompactPosting>();
188        let vec_overhead = size_of::<Vec<u8>>(); // Vec header: ptr + len + cap = 24 bytes on 64-bit
189        let term_key_size = size_of::<TermKey>();
190        let posting_builder_size = size_of::<PostingListBuilder>();
191        let spur_size = size_of::<lasso::Spur>();
192        let sparse_entry_size = size_of::<(DocId, u16, f32)>();
193
194        // hashbrown HashMap entry overhead: key + value + 1 byte control + padding
195        // Measured: ~(key_size + value_size + 8) per entry on average
196        let hashmap_entry_base_overhead = 8usize;
197
198        // FxHashMap uses same layout as hashbrown
199        let fxhashmap_entry_overhead = hashmap_entry_base_overhead;
200
201        // Postings memory
202        let postings_bytes: usize = self
203            .inverted_index
204            .values()
205            .map(|p| p.postings.capacity() * compact_posting_size + vec_overhead)
206            .sum();
207
208        // Inverted index overhead
209        let index_overhead_bytes = self.inverted_index.len()
210            * (term_key_size + posting_builder_size + hashmap_entry_base_overhead);
211
212        // Term interner: Rodeo stores strings + metadata
213        // Rodeo internal: string bytes + Spur + arena overhead (~2 pointers per string)
214        let interner_arena_overhead = 2 * size_of::<usize>();
215        let avg_term_len = 8; // Estimated average term length
216        let interner_bytes =
217            self.term_interner.len() * (avg_term_len + spur_size + interner_arena_overhead);
218
219        // Doc field lengths
220        let field_lengths_bytes =
221            self.doc_field_lengths.capacity() * size_of::<u32>() + vec_overhead;
222
223        // Dense vectors
224        let mut dense_vectors_bytes: usize = 0;
225        let mut dense_vector_count: usize = 0;
226        let doc_id_ordinal_size = size_of::<(DocId, u16)>();
227        for b in self.dense_vectors.values() {
228            dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
229                + b.doc_ids.capacity() * doc_id_ordinal_size
230                + 2 * vec_overhead; // Two Vecs
231            dense_vector_count += b.doc_ids.len();
232        }
233
234        // Local buffers
235        let local_tf_entry_size = spur_size + size_of::<u32>() + fxhashmap_entry_overhead;
236        let local_tf_buffer_bytes = self.local_tf_buffer.capacity() * local_tf_entry_size;
237
238        // Sparse vectors
239        let mut sparse_vectors_bytes: usize = 0;
240        for builder in self.sparse_vectors.values() {
241            for postings in builder.postings.values() {
242                sparse_vectors_bytes += postings.capacity() * sparse_entry_size + vec_overhead;
243            }
244            // Inner FxHashMap overhead: u32 key + Vec value ptr + overhead
245            let inner_entry_size = size_of::<u32>() + vec_overhead + fxhashmap_entry_overhead;
246            sparse_vectors_bytes += builder.postings.len() * inner_entry_size;
247        }
248        // Outer FxHashMap overhead
249        let outer_sparse_entry_size =
250            size_of::<u32>() + size_of::<SparseVectorBuilder>() + fxhashmap_entry_overhead;
251        sparse_vectors_bytes += self.sparse_vectors.len() * outer_sparse_entry_size;
252
253        // Position index
254        let mut position_index_bytes: usize = 0;
255        for pos_builder in self.position_index.values() {
256            for (_, positions) in &pos_builder.postings {
257                position_index_bytes += positions.capacity() * size_of::<u32>() + vec_overhead;
258            }
259            // Vec<(DocId, Vec<u32>)> entry size
260            let pos_entry_size = size_of::<DocId>() + vec_overhead;
261            position_index_bytes += pos_builder.postings.capacity() * pos_entry_size;
262        }
263        // HashMap overhead for position_index
264        let pos_index_entry_size =
265            term_key_size + size_of::<PositionPostingListBuilder>() + hashmap_entry_base_overhead;
266        position_index_bytes += self.position_index.len() * pos_index_entry_size;
267
268        let estimated_memory_bytes = postings_bytes
269            + index_overhead_bytes
270            + interner_bytes
271            + field_lengths_bytes
272            + dense_vectors_bytes
273            + local_tf_buffer_bytes
274            + sparse_vectors_bytes
275            + position_index_bytes;
276
277        let memory_breakdown = MemoryBreakdown {
278            postings_bytes,
279            index_overhead_bytes,
280            interner_bytes,
281            field_lengths_bytes,
282            dense_vectors_bytes,
283            dense_vector_count,
284            sparse_vectors_bytes,
285            position_index_bytes,
286        };
287
288        SegmentBuilderStats {
289            num_docs: self.next_doc_id,
290            unique_terms: self.inverted_index.len(),
291            postings_in_memory,
292            interned_strings: self.term_interner.len(),
293            doc_field_lengths_size: self.doc_field_lengths.len(),
294            estimated_memory_bytes,
295            memory_breakdown,
296        }
297    }
298
299    /// Add a document - streams to disk immediately
300    pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
301        let doc_id = self.next_doc_id;
302        self.next_doc_id += 1;
303
304        // Initialize field lengths for this document
305        let base_idx = self.doc_field_lengths.len();
306        self.doc_field_lengths
307            .resize(base_idx + self.num_indexed_fields, 0);
308
309        // Reset element ordinals for this document (for multi-valued fields)
310        self.current_element_ordinal.clear();
311
312        for (field, value) in doc.field_values() {
313            let entry = self.schema.get_field_entry(*field);
314            if entry.is_none() || !entry.unwrap().indexed {
315                continue;
316            }
317
318            let entry = entry.unwrap();
319            match (&entry.field_type, value) {
320                (FieldType::Text, FieldValue::Text(text)) => {
321                    // Get current element ordinal for multi-valued fields
322                    let element_ordinal = *self.current_element_ordinal.get(&field.0).unwrap_or(&0);
323                    let token_count =
324                        self.index_text_field(*field, doc_id, text, element_ordinal)?;
325                    // Increment element ordinal for next value of this field
326                    *self.current_element_ordinal.entry(field.0).or_insert(0) += 1;
327
328                    // Update field statistics
329                    let stats = self.field_stats.entry(field.0).or_default();
330                    stats.total_tokens += token_count as u64;
331                    stats.doc_count += 1;
332
333                    // Store field length compactly
334                    if let Some(&slot) = self.field_to_slot.get(&field.0) {
335                        self.doc_field_lengths[base_idx + slot] = token_count;
336                    }
337                }
338                (FieldType::U64, FieldValue::U64(v)) => {
339                    self.index_numeric_field(*field, doc_id, *v)?;
340                }
341                (FieldType::I64, FieldValue::I64(v)) => {
342                    self.index_numeric_field(*field, doc_id, *v as u64)?;
343                }
344                (FieldType::F64, FieldValue::F64(v)) => {
345                    self.index_numeric_field(*field, doc_id, v.to_bits())?;
346                }
347                (FieldType::DenseVector, FieldValue::DenseVector(vec)) => {
348                    // Get current element ordinal for multi-valued fields
349                    let element_ordinal = *self.current_element_ordinal.get(&field.0).unwrap_or(&0);
350                    self.index_dense_vector_field(*field, doc_id, element_ordinal as u16, vec)?;
351                    // Increment element ordinal for next value of this field
352                    *self.current_element_ordinal.entry(field.0).or_insert(0) += 1;
353                }
354                (FieldType::SparseVector, FieldValue::SparseVector(entries)) => {
355                    // Get current element ordinal for multi-valued fields
356                    let element_ordinal = *self.current_element_ordinal.get(&field.0).unwrap_or(&0);
357                    self.index_sparse_vector_field(
358                        *field,
359                        doc_id,
360                        element_ordinal as u16,
361                        entries,
362                    )?;
363                    // Increment element ordinal for next value of this field
364                    *self.current_element_ordinal.entry(field.0).or_insert(0) += 1;
365                }
366                _ => {}
367            }
368        }
369
370        // Stream document to disk immediately
371        self.write_document_to_store(&doc)?;
372
373        Ok(doc_id)
374    }
375
376    /// Index a text field using interned terms
377    ///
378    /// Optimization: Zero-allocation inline tokenization + term frequency aggregation.
379    /// Instead of allocating a String per token, we:
380    /// 1. Iterate over whitespace-split words
381    /// 2. Build lowercase token in a reusable buffer
382    /// 3. Intern directly from the buffer
383    ///
384    /// If position recording is enabled for this field, also records token positions
385    /// encoded as (element_ordinal << 20) | token_position.
386    fn index_text_field(
387        &mut self,
388        field: Field,
389        doc_id: DocId,
390        text: &str,
391        element_ordinal: u32,
392    ) -> Result<u32> {
393        use crate::dsl::PositionMode;
394
395        let field_id = field.0;
396        let position_mode = self
397            .position_enabled_fields
398            .get(&field_id)
399            .copied()
400            .flatten();
401
402        // Phase 1: Aggregate term frequencies within this document
403        // Also collect positions if enabled
404        // Reuse buffer to avoid allocations
405        self.local_tf_buffer.clear();
406
407        // For position tracking: term -> list of positions in this text
408        let mut local_positions: FxHashMap<Spur, Vec<u32>> = FxHashMap::default();
409
410        let mut token_position = 0u32;
411
412        // Zero-allocation tokenization: iterate words, lowercase inline, intern directly
413        for word in text.split_whitespace() {
414            // Build lowercase token in reusable buffer
415            self.token_buffer.clear();
416            for c in word.chars() {
417                if c.is_alphanumeric() {
418                    for lc in c.to_lowercase() {
419                        self.token_buffer.push(lc);
420                    }
421                }
422            }
423
424            if self.token_buffer.is_empty() {
425                continue;
426            }
427
428            // Intern the term directly from buffer - O(1) amortized
429            let term_spur = self.term_interner.get_or_intern(&self.token_buffer);
430            *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
431
432            // Record position based on mode
433            if let Some(mode) = position_mode {
434                let encoded_pos = match mode {
435                    // Ordinal only: just store element ordinal (token position = 0)
436                    PositionMode::Ordinal => element_ordinal << 20,
437                    // Token position only: just store token position (ordinal = 0)
438                    PositionMode::TokenPosition => token_position,
439                    // Full: encode both
440                    PositionMode::Full => (element_ordinal << 20) | token_position,
441                };
442                local_positions
443                    .entry(term_spur)
444                    .or_default()
445                    .push(encoded_pos);
446            }
447
448            token_position += 1;
449        }
450
451        // Phase 2: Insert aggregated terms into inverted index
452        // Now we only do one inverted_index lookup per unique term in doc
453        for (&term_spur, &tf) in &self.local_tf_buffer {
454            let term_key = TermKey {
455                field: field_id,
456                term: term_spur,
457            };
458
459            let is_new_term = !self.inverted_index.contains_key(&term_key);
460            let posting = self
461                .inverted_index
462                .entry(term_key)
463                .or_insert_with(PostingListBuilder::new);
464            posting.add(doc_id, tf);
465
466            // Incremental memory tracking
467            use std::mem::size_of;
468            self.estimated_memory += size_of::<CompactPosting>();
469            if is_new_term {
470                // HashMap entry overhead + PostingListBuilder + Vec header
471                self.estimated_memory +=
472                    size_of::<TermKey>() + size_of::<PostingListBuilder>() + 24;
473            }
474
475            // Add positions if enabled
476            if position_mode.is_some()
477                && let Some(positions) = local_positions.get(&term_spur)
478            {
479                let pos_posting = self
480                    .position_index
481                    .entry(term_key)
482                    .or_insert_with(PositionPostingListBuilder::new);
483                for &pos in positions {
484                    pos_posting.add_position(doc_id, pos);
485                }
486            }
487        }
488
489        Ok(token_position)
490    }
491
492    fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
493        // For numeric fields, we use a special encoding
494        let term_str = format!("__num_{}", value);
495        let term_spur = self.term_interner.get_or_intern(&term_str);
496
497        let term_key = TermKey {
498            field: field.0,
499            term: term_spur,
500        };
501
502        let posting = self
503            .inverted_index
504            .entry(term_key)
505            .or_insert_with(PostingListBuilder::new);
506        posting.add(doc_id, 1);
507
508        Ok(())
509    }
510
511    /// Index a dense vector field with ordinal tracking
512    fn index_dense_vector_field(
513        &mut self,
514        field: Field,
515        doc_id: DocId,
516        ordinal: u16,
517        vector: &[f32],
518    ) -> Result<()> {
519        let dim = vector.len();
520
521        let builder = self
522            .dense_vectors
523            .entry(field.0)
524            .or_insert_with(|| DenseVectorBuilder::new(dim));
525
526        // Verify dimension consistency
527        if builder.dim != dim && builder.len() > 0 {
528            return Err(crate::Error::Schema(format!(
529                "Dense vector dimension mismatch: expected {}, got {}",
530                builder.dim, dim
531            )));
532        }
533
534        builder.add(doc_id, ordinal, vector);
535
536        // Incremental memory tracking
537        use std::mem::{size_of, size_of_val};
538        self.estimated_memory += size_of_val(vector) + size_of::<(DocId, u16)>();
539
540        Ok(())
541    }
542
543    /// Index a sparse vector field using dedicated sparse posting lists
544    ///
545    /// Collects (doc_id, ordinal, weight) postings per dimension. During commit, these are
546    /// converted to BlockSparsePostingList with proper quantization from SparseVectorConfig.
547    ///
548    /// Weights below the configured `weight_threshold` are not indexed.
549    fn index_sparse_vector_field(
550        &mut self,
551        field: Field,
552        doc_id: DocId,
553        ordinal: u16,
554        entries: &[(u32, f32)],
555    ) -> Result<()> {
556        // Get weight threshold from field config (default 0.0 = no filtering)
557        let weight_threshold = self
558            .schema
559            .get_field_entry(field)
560            .and_then(|entry| entry.sparse_vector_config.as_ref())
561            .map(|config| config.weight_threshold)
562            .unwrap_or(0.0);
563
564        let builder = self
565            .sparse_vectors
566            .entry(field.0)
567            .or_insert_with(SparseVectorBuilder::new);
568
569        for &(dim_id, weight) in entries {
570            // Skip weights below threshold
571            if weight.abs() < weight_threshold {
572                continue;
573            }
574
575            // Incremental memory tracking
576            use std::mem::size_of;
577            let is_new_dim = !builder.postings.contains_key(&dim_id);
578            builder.add(dim_id, doc_id, ordinal, weight);
579            self.estimated_memory += size_of::<(DocId, u16, f32)>();
580            if is_new_dim {
581                // HashMap entry overhead + Vec header
582                self.estimated_memory += size_of::<u32>() + size_of::<Vec<(DocId, u16, f32)>>() + 8; // 8 = hashmap control byte + padding
583            }
584        }
585
586        Ok(())
587    }
588
589    /// Write document to streaming store
590    fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
591        use byteorder::{LittleEndian, WriteBytesExt};
592
593        let doc_bytes = super::store::serialize_document(doc, &self.schema)?;
594
595        self.store_file
596            .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
597        self.store_file.write_all(&doc_bytes)?;
598
599        Ok(())
600    }
601
602    /// Build the final segment
603    pub async fn build<D: Directory + DirectoryWriter>(
604        mut self,
605        dir: &D,
606        segment_id: SegmentId,
607    ) -> Result<SegmentMeta> {
608        // Flush any buffered data
609        self.store_file.flush()?;
610
611        let files = SegmentFiles::new(segment_id.0);
612
613        // Build positions FIRST to get offsets for TermInfo
614        let (positions_data, position_offsets) = self.build_positions_file()?;
615
616        // Extract data needed for parallel processing
617        let store_path = self.store_path.clone();
618        let schema = self.schema.clone();
619        let num_compression_threads = self.config.num_compression_threads;
620        let compression_level = self.config.compression_level;
621
622        // Build postings and document store in parallel
623        let (postings_result, store_result) = rayon::join(
624            || self.build_postings(&position_offsets),
625            || {
626                Self::build_store_parallel(
627                    &store_path,
628                    &schema,
629                    num_compression_threads,
630                    compression_level,
631                )
632            },
633        );
634
635        let (term_dict_data, postings_data) = postings_result?;
636        let store_data = store_result?;
637
638        // Write to directory
639        dir.write(&files.term_dict, &term_dict_data).await?;
640        dir.write(&files.postings, &postings_data).await?;
641        dir.write(&files.store, &store_data).await?;
642
643        // Write positions file (data only, offsets are in TermInfo)
644        if !positions_data.is_empty() {
645            dir.write(&files.positions, &positions_data).await?;
646        }
647
648        // Build and write dense vector indexes (RaBitQ) - all in one file
649        if !self.dense_vectors.is_empty() {
650            let vectors_data = self.build_vectors_file()?;
651            if !vectors_data.is_empty() {
652                dir.write(&files.vectors, &vectors_data).await?;
653            }
654        }
655
656        // Build and write sparse vector posting lists
657        if !self.sparse_vectors.is_empty() {
658            let sparse_data = self.build_sparse_file()?;
659            if !sparse_data.is_empty() {
660                dir.write(&files.sparse, &sparse_data).await?;
661            }
662        }
663
664        let meta = SegmentMeta {
665            id: segment_id.0,
666            num_docs: self.next_doc_id,
667            field_stats: self.field_stats.clone(),
668        };
669
670        dir.write(&files.meta, &meta.serialize()?).await?;
671
672        // Cleanup temp files
673        let _ = std::fs::remove_file(&self.store_path);
674
675        Ok(meta)
676    }
677
678    /// Build unified vectors file containing all dense vector indexes
679    ///
680    /// File format:
681    /// - Header: num_fields (u32)
682    /// - For each field: field_id (u32), index_type (u8), offset (u64), length (u64)
683    /// - Data: concatenated serialized indexes (RaBitQ, IVF-RaBitQ, or ScaNN)
684    fn build_vectors_file(&self) -> Result<Vec<u8>> {
685        use byteorder::{LittleEndian, WriteBytesExt};
686
687        // Build all indexes first: (field_id, index_type, data)
688        let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
689
690        for (&field_id, builder) in &self.dense_vectors {
691            if builder.len() == 0 {
692                continue;
693            }
694
695            let field = crate::dsl::Field(field_id);
696
697            // Get dense vector config
698            let dense_config = self
699                .schema
700                .get_field_entry(field)
701                .and_then(|e| e.dense_vector_config.as_ref());
702
703            // Get vectors, potentially trimmed for matryoshka/MRL indexing
704            let index_dim = dense_config.map(|c| c.index_dim()).unwrap_or(builder.dim);
705            let vectors = if index_dim < builder.dim {
706                // Trim vectors to mrl_dim for indexing
707                builder.get_vectors_trimmed(index_dim)
708            } else {
709                builder.get_vectors()
710            };
711
712            // During normal indexing, segments always store Flat (raw vectors).
713            // ANN indexes are built at index-level via build_vector_index() which
714            // trains centroids/codebooks once from all vectors and triggers rebuild.
715            let flat_data = FlatVectorData {
716                dim: index_dim,
717                vectors: vectors.clone(),
718                doc_ids: builder.doc_ids.clone(),
719            };
720            let index_bytes = serde_json::to_vec(&flat_data)
721                .map_err(|e| crate::Error::Serialization(e.to_string()))?;
722            let index_type = 3u8; // 3 = Flat
723
724            field_indexes.push((field_id, index_type, index_bytes));
725        }
726
727        if field_indexes.is_empty() {
728            return Ok(Vec::new());
729        }
730
731        // Sort by field_id for consistent ordering
732        field_indexes.sort_by_key(|(id, _, _)| *id);
733
734        // Calculate header size: num_fields + (field_id, index_type, offset, len) per field
735        let header_size = 4 + field_indexes.len() * (4 + 1 + 8 + 8);
736
737        // Build output
738        let mut output = Vec::new();
739
740        // Write number of fields
741        output.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
742
743        // Calculate offsets and write header entries
744        let mut current_offset = header_size as u64;
745        for (field_id, index_type, data) in &field_indexes {
746            output.write_u32::<LittleEndian>(*field_id)?;
747            output.write_u8(*index_type)?;
748            output.write_u64::<LittleEndian>(current_offset)?;
749            output.write_u64::<LittleEndian>(data.len() as u64)?;
750            current_offset += data.len() as u64;
751        }
752
753        // Write data
754        for (_, _, data) in field_indexes {
755            output.extend_from_slice(&data);
756        }
757
758        Ok(output)
759    }
760
761    /// Build sparse vectors file containing BlockSparsePostingList per field/dimension
762    ///
763    /// File format (direct-indexed table for O(1) dimension lookup):
764    /// - Header: num_fields (u32)
765    /// - For each field:
766    ///   - field_id (u32)
767    ///   - quantization (u8)
768    ///   - max_dim_id (u32)          ← highest dimension ID + 1 (table size)
769    ///   - table: [(offset: u64, length: u32)] × max_dim_id  ← direct indexed by dim_id
770    ///     (offset=0, length=0 means dimension not present)
771    /// - Data: concatenated serialized BlockSparsePostingList
772    fn build_sparse_file(&self) -> Result<Vec<u8>> {
773        use crate::structures::{BlockSparsePostingList, WeightQuantization};
774        use byteorder::{LittleEndian, WriteBytesExt};
775
776        if self.sparse_vectors.is_empty() {
777            return Ok(Vec::new());
778        }
779
780        // Collect field data: (field_id, quantization, max_dim_id, dim_id -> serialized_bytes)
781        type SparseFieldData = (u32, WeightQuantization, u32, FxHashMap<u32, Vec<u8>>);
782        let mut field_data: Vec<SparseFieldData> = Vec::new();
783
784        for (&field_id, builder) in &self.sparse_vectors {
785            if builder.is_empty() {
786                continue;
787            }
788
789            let field = crate::dsl::Field(field_id);
790
791            // Get config from field
792            let sparse_config = self
793                .schema
794                .get_field_entry(field)
795                .and_then(|e| e.sparse_vector_config.as_ref());
796
797            let quantization = sparse_config
798                .map(|c| c.weight_quantization)
799                .unwrap_or(WeightQuantization::Float32);
800
801            let block_size = sparse_config.map(|c| c.block_size).unwrap_or(128);
802
803            // Find max dimension ID
804            let max_dim_id = builder.postings.keys().max().copied().unwrap_or(0);
805
806            // Build BlockSparsePostingList for each dimension
807            let mut dim_bytes: FxHashMap<u32, Vec<u8>> = FxHashMap::default();
808
809            for (&dim_id, postings) in &builder.postings {
810                // Sort postings by doc_id, then by ordinal
811                let mut sorted_postings = postings.clone();
812                sorted_postings.sort_by_key(|(doc_id, ordinal, _)| (*doc_id, *ordinal));
813
814                // Build BlockSparsePostingList with configured block size
815                let block_list = BlockSparsePostingList::from_postings_with_block_size(
816                    &sorted_postings,
817                    quantization,
818                    block_size,
819                )
820                .map_err(crate::Error::Io)?;
821
822                // Serialize
823                let mut bytes = Vec::new();
824                block_list.serialize(&mut bytes).map_err(crate::Error::Io)?;
825
826                dim_bytes.insert(dim_id, bytes);
827            }
828
829            field_data.push((field_id, quantization, max_dim_id + 1, dim_bytes));
830        }
831
832        if field_data.is_empty() {
833            return Ok(Vec::new());
834        }
835
836        // Sort by field_id
837        field_data.sort_by_key(|(id, _, _, _)| *id);
838
839        // Calculate header size
840        // Header: num_fields (4)
841        // Per field: field_id (4) + quant (1) + max_dim_id (4) + table (12 * max_dim_id)
842        let mut header_size = 4u64;
843        for (_, _, max_dim_id, _) in &field_data {
844            header_size += 4 + 1 + 4; // field_id + quant + max_dim_id
845            header_size += (*max_dim_id as u64) * 12; // table entries: (offset: u64, length: u32)
846        }
847
848        // Build output
849        let mut output = Vec::new();
850
851        // Write num_fields
852        output.write_u32::<LittleEndian>(field_data.len() as u32)?;
853
854        // Track current data offset (after all headers)
855        let mut current_offset = header_size;
856
857        // First, collect all data bytes in order and build offset tables
858        let mut all_data: Vec<u8> = Vec::new();
859        let mut field_tables: Vec<Vec<(u64, u32)>> = Vec::new();
860
861        for (_, _, max_dim_id, dim_bytes) in &field_data {
862            let mut table: Vec<(u64, u32)> = vec![(0, 0); *max_dim_id as usize];
863
864            // Process dimensions in order
865            for dim_id in 0..*max_dim_id {
866                if let Some(bytes) = dim_bytes.get(&dim_id) {
867                    table[dim_id as usize] = (current_offset, bytes.len() as u32);
868                    current_offset += bytes.len() as u64;
869                    all_data.extend_from_slice(bytes);
870                }
871                // else: table entry stays (0, 0) meaning dimension not present
872            }
873
874            field_tables.push(table);
875        }
876
877        // Write field headers and tables
878        for (i, (field_id, quantization, max_dim_id, _)) in field_data.iter().enumerate() {
879            output.write_u32::<LittleEndian>(*field_id)?;
880            output.write_u8(*quantization as u8)?;
881            output.write_u32::<LittleEndian>(*max_dim_id)?;
882
883            // Write table (direct indexed by dim_id)
884            for &(offset, length) in &field_tables[i] {
885                output.write_u64::<LittleEndian>(offset)?;
886                output.write_u32::<LittleEndian>(length)?;
887            }
888        }
889
890        // Write data
891        output.extend_from_slice(&all_data);
892
893        Ok(output)
894    }
895
896    /// Build positions file for phrase queries
897    ///
898    /// File format:
899    /// - Data only: concatenated serialized PositionPostingList
900    /// - Position offsets are stored in TermInfo (no separate header needed)
901    ///
902    /// Returns: (positions_data, term_key -> (offset, len) mapping)
903    #[allow(clippy::type_complexity)]
904    fn build_positions_file(&self) -> Result<(Vec<u8>, FxHashMap<Vec<u8>, (u64, u32)>)> {
905        use crate::structures::PositionPostingList;
906
907        let mut position_offsets: FxHashMap<Vec<u8>, (u64, u32)> = FxHashMap::default();
908
909        if self.position_index.is_empty() {
910            return Ok((Vec::new(), position_offsets));
911        }
912
913        // Collect and sort entries by key
914        let mut entries: Vec<(Vec<u8>, &PositionPostingListBuilder)> = self
915            .position_index
916            .iter()
917            .map(|(term_key, pos_list)| {
918                let term_str = self.term_interner.resolve(&term_key.term);
919                let mut key = Vec::with_capacity(4 + term_str.len());
920                key.extend_from_slice(&term_key.field.to_le_bytes());
921                key.extend_from_slice(term_str.as_bytes());
922                (key, pos_list)
923            })
924            .collect();
925
926        entries.sort_by(|a, b| a.0.cmp(&b.0));
927
928        // Serialize all position lists and track offsets
929        let mut output = Vec::new();
930
931        for (key, pos_builder) in entries {
932            // Convert builder to PositionPostingList
933            let mut pos_list = PositionPostingList::with_capacity(pos_builder.postings.len());
934            for (doc_id, positions) in &pos_builder.postings {
935                pos_list.push(*doc_id, positions.clone());
936            }
937
938            // Serialize and track offset
939            let offset = output.len() as u64;
940            pos_list.serialize(&mut output).map_err(crate::Error::Io)?;
941            let len = (output.len() as u64 - offset) as u32;
942
943            position_offsets.insert(key, (offset, len));
944        }
945
946        Ok((output, position_offsets))
947    }
948
949    /// Build postings from inverted index
950    ///
951    /// Uses parallel processing to serialize posting lists concurrently.
952    /// Position offsets are looked up and embedded in TermInfo.
953    fn build_postings(
954        &mut self,
955        position_offsets: &FxHashMap<Vec<u8>, (u64, u32)>,
956    ) -> Result<(Vec<u8>, Vec<u8>)> {
957        // Phase 1: Collect and sort term keys (parallel key generation)
958        // Key format: field_id (4 bytes) + term bytes
959        let mut term_entries: Vec<(Vec<u8>, &PostingListBuilder)> = self
960            .inverted_index
961            .iter()
962            .map(|(term_key, posting_list)| {
963                let term_str = self.term_interner.resolve(&term_key.term);
964                let mut key = Vec::with_capacity(4 + term_str.len());
965                key.extend_from_slice(&term_key.field.to_le_bytes());
966                key.extend_from_slice(term_str.as_bytes());
967                (key, posting_list)
968            })
969            .collect();
970
971        // Sort by key for SSTable ordering
972        term_entries.par_sort_unstable_by(|a, b| a.0.cmp(&b.0));
973
974        // Phase 2: Parallel serialization of posting lists
975        // Each term's posting list is serialized independently
976        let serialized: Vec<(Vec<u8>, SerializedPosting)> = term_entries
977            .into_par_iter()
978            .map(|(key, posting_builder)| {
979                // Build posting list from in-memory postings
980                let mut full_postings = PostingList::with_capacity(posting_builder.len());
981                for p in &posting_builder.postings {
982                    full_postings.push(p.doc_id, p.term_freq as u32);
983                }
984
985                // Build term info
986                let doc_ids: Vec<u32> = full_postings.iter().map(|p| p.doc_id).collect();
987                let term_freqs: Vec<u32> = full_postings.iter().map(|p| p.term_freq).collect();
988
989                // Don't inline if term has positions (inline format doesn't support position offsets)
990                let has_positions = position_offsets.contains_key(&key);
991                let result = if !has_positions
992                    && let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs)
993                {
994                    SerializedPosting::Inline(inline)
995                } else {
996                    // Serialize to local buffer
997                    let mut posting_bytes = Vec::new();
998                    let block_list =
999                        crate::structures::BlockPostingList::from_posting_list(&full_postings)
1000                            .expect("BlockPostingList creation failed");
1001                    block_list
1002                        .serialize(&mut posting_bytes)
1003                        .expect("BlockPostingList serialization failed");
1004                    SerializedPosting::External {
1005                        bytes: posting_bytes,
1006                        doc_count: full_postings.doc_count(),
1007                    }
1008                };
1009
1010                (key, result)
1011            })
1012            .collect();
1013
1014        // Phase 3: Sequential assembly (must be sequential for offset calculation)
1015        let mut term_dict = Vec::new();
1016        let mut postings = Vec::new();
1017        let mut writer = SSTableWriter::<TermInfo>::new(&mut term_dict);
1018
1019        for (key, serialized_posting) in serialized {
1020            let term_info = match serialized_posting {
1021                SerializedPosting::Inline(info) => info,
1022                SerializedPosting::External { bytes, doc_count } => {
1023                    let posting_offset = postings.len() as u64;
1024                    let posting_len = bytes.len() as u32;
1025                    postings.extend_from_slice(&bytes);
1026
1027                    // Look up position offset for this term
1028                    if let Some(&(pos_offset, pos_len)) = position_offsets.get(&key) {
1029                        TermInfo::external_with_positions(
1030                            posting_offset,
1031                            posting_len,
1032                            doc_count,
1033                            pos_offset,
1034                            pos_len,
1035                        )
1036                    } else {
1037                        TermInfo::external(posting_offset, posting_len, doc_count)
1038                    }
1039                }
1040            };
1041
1042            writer.insert(&key, &term_info)?;
1043        }
1044
1045        writer.finish()?;
1046        Ok((term_dict, postings))
1047    }
1048
1049    /// Build document store from streamed temp file (static method for parallel execution)
1050    ///
1051    /// Uses parallel processing to deserialize documents concurrently.
1052    fn build_store_parallel(
1053        store_path: &PathBuf,
1054        schema: &Schema,
1055        num_compression_threads: usize,
1056        compression_level: CompressionLevel,
1057    ) -> Result<Vec<u8>> {
1058        use super::store::EagerParallelStoreWriter;
1059
1060        let file = File::open(store_path)?;
1061        let mmap = unsafe { memmap2::Mmap::map(&file)? };
1062
1063        // Phase 1: Parse document boundaries (sequential, fast)
1064        let mut doc_ranges: Vec<(usize, usize)> = Vec::new();
1065        let mut offset = 0usize;
1066        while offset + 4 <= mmap.len() {
1067            let doc_len = u32::from_le_bytes([
1068                mmap[offset],
1069                mmap[offset + 1],
1070                mmap[offset + 2],
1071                mmap[offset + 3],
1072            ]) as usize;
1073            offset += 4;
1074
1075            if offset + doc_len > mmap.len() {
1076                break;
1077            }
1078
1079            doc_ranges.push((offset, doc_len));
1080            offset += doc_len;
1081        }
1082
1083        // Phase 2: Parallel deserialization of documents
1084        let docs: Vec<Document> = doc_ranges
1085            .into_par_iter()
1086            .filter_map(|(start, len)| {
1087                let doc_bytes = &mmap[start..start + len];
1088                super::store::deserialize_document(doc_bytes, schema).ok()
1089            })
1090            .collect();
1091
1092        // Phase 3: Write to store (compression is already parallel in EagerParallelStoreWriter)
1093        let mut store_data = Vec::new();
1094        let mut store_writer = EagerParallelStoreWriter::with_compression_level(
1095            &mut store_data,
1096            num_compression_threads,
1097            compression_level,
1098        );
1099
1100        for doc in &docs {
1101            store_writer.store(doc, schema)?;
1102        }
1103
1104        store_writer.finish()?;
1105        Ok(store_data)
1106    }
1107}
1108
1109impl Drop for SegmentBuilder {
1110    fn drop(&mut self) {
1111        // Cleanup temp files on drop
1112        let _ = std::fs::remove_file(&self.store_path);
1113    }
1114}