Skip to main content

hermes_core/segment/builder/
mod.rs

1//! Streaming segment builder with optimized memory usage
2//!
3//! Key optimizations:
4//! - **String interning**: Terms are interned using `lasso` to avoid repeated allocations
5//! - **hashbrown HashMap**: O(1) average insertion instead of BTreeMap's O(log n)
6//! - **Streaming document store**: Documents written to disk immediately
7//! - **Zero-copy store build**: Pre-serialized doc bytes passed directly to compressor
8//! - **Parallel posting serialization**: Rayon parallel sort + serialize
9//! - **Inline posting fast path**: Small terms skip PostingList/BlockPostingList entirely
10
11mod config;
12mod posting;
13mod vectors;
14
15pub use config::{MemoryBreakdown, SegmentBuilderConfig, SegmentBuilderStats};
16
17use std::fs::{File, OpenOptions};
18use std::io::{BufWriter, Write};
19use std::mem::size_of;
20use std::path::PathBuf;
21
22use hashbrown::HashMap;
23use lasso::{Rodeo, Spur};
24use rayon::prelude::*;
25use rustc_hash::FxHashMap;
26
27use crate::compression::CompressionLevel;
28
29use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
30use std::sync::Arc;
31
32use crate::directories::{Directory, DirectoryWriter};
33use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
34use crate::structures::{PostingList, SSTableWriter, TermInfo};
35use crate::tokenizer::BoxedTokenizer;
36use crate::{DocId, Result};
37
38use posting::{
39    CompactPosting, PositionPostingListBuilder, PostingListBuilder, SerializedPosting, TermKey,
40};
41use vectors::{DenseVectorBuilder, SparseVectorBuilder};
42
43use super::vector_data::FlatVectorData;
44
45/// Size of the document store buffer before writing to disk
46const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; // 16MB
47
48/// Memory overhead per new term in the inverted index:
49/// HashMap entry control byte + padding + TermKey + PostingListBuilder + Vec header
50const NEW_TERM_OVERHEAD: usize = size_of::<TermKey>() + size_of::<PostingListBuilder>() + 24;
51
52/// Memory overhead per newly interned string: Spur + arena pointers (2 × usize)
53const INTERN_OVERHEAD: usize = size_of::<Spur>() + 2 * size_of::<usize>();
54
55/// Memory overhead per new term in the position index
56const NEW_POS_TERM_OVERHEAD: usize =
57    size_of::<TermKey>() + size_of::<PositionPostingListBuilder>() + 24;
58
59/// Segment builder with optimized memory usage
60///
61/// Features:
62/// - Streams documents to disk immediately (no in-memory document storage)
63/// - Uses string interning for terms (reduced allocations)
64/// - Uses hashbrown HashMap (faster than BTreeMap)
65pub struct SegmentBuilder {
66    schema: Arc<Schema>,
67    config: SegmentBuilderConfig,
68    tokenizers: FxHashMap<Field, BoxedTokenizer>,
69
70    /// String interner for terms - O(1) lookup and deduplication
71    term_interner: Rodeo,
72
73    /// Inverted index: term key -> posting list
74    inverted_index: HashMap<TermKey, PostingListBuilder>,
75
76    /// Streaming document store writer
77    store_file: BufWriter<File>,
78    store_path: PathBuf,
79
80    /// Document count
81    next_doc_id: DocId,
82
83    /// Per-field statistics for BM25F
84    field_stats: FxHashMap<u32, FieldStats>,
85
86    /// Per-document field lengths stored compactly
87    /// Uses a flat Vec instead of Vec<HashMap> for better cache locality
88    /// Layout: [doc0_field0_len, doc0_field1_len, ..., doc1_field0_len, ...]
89    doc_field_lengths: Vec<u32>,
90    num_indexed_fields: usize,
91    field_to_slot: FxHashMap<u32, usize>,
92
93    /// Reusable buffer for per-document term frequency aggregation
94    /// Avoids allocating a new hashmap for each document
95    local_tf_buffer: FxHashMap<Spur, u32>,
96
97    /// Reusable buffer for per-document position tracking (when positions enabled)
98    /// Avoids allocating a new hashmap for each text field per document
99    local_positions: FxHashMap<Spur, Vec<u32>>,
100
101    /// Reusable buffer for tokenization to avoid per-token String allocations
102    token_buffer: String,
103
104    /// Reusable buffer for numeric field term encoding (avoids format!() alloc per call)
105    numeric_buffer: String,
106
107    /// Dense vector storage per field: field -> (doc_ids, vectors)
108    /// Vectors are stored as flat f32 arrays for efficient RaBitQ indexing
109    dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
110
111    /// Sparse vector storage per field: field -> SparseVectorBuilder
112    /// Uses proper BlockSparsePostingList with configurable quantization
113    sparse_vectors: FxHashMap<u32, SparseVectorBuilder>,
114
115    /// Position index for fields with positions enabled
116    /// term key -> position posting list
117    position_index: HashMap<TermKey, PositionPostingListBuilder>,
118
119    /// Fields that have position tracking enabled, with their mode
120    position_enabled_fields: FxHashMap<u32, Option<crate::dsl::PositionMode>>,
121
122    /// Current element ordinal for multi-valued fields (reset per document)
123    current_element_ordinal: FxHashMap<u32, u32>,
124
125    /// Incrementally tracked memory estimate (avoids expensive stats() calls)
126    estimated_memory: usize,
127
128    /// Reusable buffer for document serialization (avoids per-document allocation)
129    doc_serialize_buffer: Vec<u8>,
130}
131
132impl SegmentBuilder {
133    /// Create a new segment builder
134    pub fn new(schema: Arc<Schema>, config: SegmentBuilderConfig) -> Result<Self> {
135        let segment_id = uuid::Uuid::new_v4();
136        let store_path = config
137            .temp_dir
138            .join(format!("hermes_store_{}.tmp", segment_id));
139
140        let store_file = BufWriter::with_capacity(
141            STORE_BUFFER_SIZE,
142            OpenOptions::new()
143                .create(true)
144                .write(true)
145                .truncate(true)
146                .open(&store_path)?,
147        );
148
149        // Count indexed fields for compact field length storage
150        // Also track which fields have position recording enabled
151        let mut num_indexed_fields = 0;
152        let mut field_to_slot = FxHashMap::default();
153        let mut position_enabled_fields = FxHashMap::default();
154        for (field, entry) in schema.fields() {
155            if entry.indexed && matches!(entry.field_type, FieldType::Text) {
156                field_to_slot.insert(field.0, num_indexed_fields);
157                num_indexed_fields += 1;
158                if entry.positions.is_some() {
159                    position_enabled_fields.insert(field.0, entry.positions);
160                }
161            }
162        }
163
164        Ok(Self {
165            schema,
166            tokenizers: FxHashMap::default(),
167            term_interner: Rodeo::new(),
168            inverted_index: HashMap::with_capacity(config.posting_map_capacity),
169            store_file,
170            store_path,
171            next_doc_id: 0,
172            field_stats: FxHashMap::default(),
173            doc_field_lengths: Vec::new(),
174            num_indexed_fields,
175            field_to_slot,
176            local_tf_buffer: FxHashMap::default(),
177            local_positions: FxHashMap::default(),
178            token_buffer: String::with_capacity(64),
179            numeric_buffer: String::with_capacity(32),
180            config,
181            dense_vectors: FxHashMap::default(),
182            sparse_vectors: FxHashMap::default(),
183            position_index: HashMap::new(),
184            position_enabled_fields,
185            current_element_ordinal: FxHashMap::default(),
186            estimated_memory: 0,
187            doc_serialize_buffer: Vec::with_capacity(256),
188        })
189    }
190
191    pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
192        self.tokenizers.insert(field, tokenizer);
193    }
194
195    /// Get the current element ordinal for a field and increment it.
196    /// Used for multi-valued fields (text, dense_vector, sparse_vector).
197    fn next_element_ordinal(&mut self, field_id: u32) -> u32 {
198        let ordinal = *self.current_element_ordinal.get(&field_id).unwrap_or(&0);
199        *self.current_element_ordinal.entry(field_id).or_insert(0) += 1;
200        ordinal
201    }
202
203    pub fn num_docs(&self) -> u32 {
204        self.next_doc_id
205    }
206
207    /// Fast O(1) memory estimate - updated incrementally during indexing
208    #[inline]
209    pub fn estimated_memory_bytes(&self) -> usize {
210        self.estimated_memory
211    }
212
213    /// Count total unique sparse dimensions across all fields
214    pub fn sparse_dim_count(&self) -> usize {
215        self.sparse_vectors.values().map(|b| b.postings.len()).sum()
216    }
217
218    /// Get current statistics for debugging performance (expensive - iterates all data)
219    pub fn stats(&self) -> SegmentBuilderStats {
220        use std::mem::size_of;
221
222        let postings_in_memory: usize =
223            self.inverted_index.values().map(|p| p.postings.len()).sum();
224
225        // Size constants computed from actual types
226        let compact_posting_size = size_of::<CompactPosting>();
227        let vec_overhead = size_of::<Vec<u8>>(); // Vec header: ptr + len + cap = 24 bytes on 64-bit
228        let term_key_size = size_of::<TermKey>();
229        let posting_builder_size = size_of::<PostingListBuilder>();
230        let spur_size = size_of::<lasso::Spur>();
231        let sparse_entry_size = size_of::<(DocId, u16, f32)>();
232
233        // hashbrown HashMap entry overhead: key + value + 1 byte control + padding
234        // Measured: ~(key_size + value_size + 8) per entry on average
235        let hashmap_entry_base_overhead = 8usize;
236
237        // FxHashMap uses same layout as hashbrown
238        let fxhashmap_entry_overhead = hashmap_entry_base_overhead;
239
240        // Postings memory
241        let postings_bytes: usize = self
242            .inverted_index
243            .values()
244            .map(|p| p.postings.capacity() * compact_posting_size + vec_overhead)
245            .sum();
246
247        // Inverted index overhead
248        let index_overhead_bytes = self.inverted_index.len()
249            * (term_key_size + posting_builder_size + hashmap_entry_base_overhead);
250
251        // Term interner: Rodeo stores strings + metadata
252        // Rodeo internal: string bytes + Spur + arena overhead (~2 pointers per string)
253        let interner_arena_overhead = 2 * size_of::<usize>();
254        let avg_term_len = 8; // Estimated average term length
255        let interner_bytes =
256            self.term_interner.len() * (avg_term_len + spur_size + interner_arena_overhead);
257
258        // Doc field lengths
259        let field_lengths_bytes =
260            self.doc_field_lengths.capacity() * size_of::<u32>() + vec_overhead;
261
262        // Dense vectors
263        let mut dense_vectors_bytes: usize = 0;
264        let mut dense_vector_count: usize = 0;
265        let doc_id_ordinal_size = size_of::<(DocId, u16)>();
266        for b in self.dense_vectors.values() {
267            dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
268                + b.doc_ids.capacity() * doc_id_ordinal_size
269                + 2 * vec_overhead; // Two Vecs
270            dense_vector_count += b.doc_ids.len();
271        }
272
273        // Local buffers
274        let local_tf_entry_size = spur_size + size_of::<u32>() + fxhashmap_entry_overhead;
275        let local_tf_buffer_bytes = self.local_tf_buffer.capacity() * local_tf_entry_size;
276
277        // Sparse vectors
278        let mut sparse_vectors_bytes: usize = 0;
279        for builder in self.sparse_vectors.values() {
280            for postings in builder.postings.values() {
281                sparse_vectors_bytes += postings.capacity() * sparse_entry_size + vec_overhead;
282            }
283            // Inner FxHashMap overhead: u32 key + Vec value ptr + overhead
284            let inner_entry_size = size_of::<u32>() + vec_overhead + fxhashmap_entry_overhead;
285            sparse_vectors_bytes += builder.postings.len() * inner_entry_size;
286        }
287        // Outer FxHashMap overhead
288        let outer_sparse_entry_size =
289            size_of::<u32>() + size_of::<SparseVectorBuilder>() + fxhashmap_entry_overhead;
290        sparse_vectors_bytes += self.sparse_vectors.len() * outer_sparse_entry_size;
291
292        // Position index
293        let mut position_index_bytes: usize = 0;
294        for pos_builder in self.position_index.values() {
295            for (_, positions) in &pos_builder.postings {
296                position_index_bytes += positions.capacity() * size_of::<u32>() + vec_overhead;
297            }
298            // Vec<(DocId, Vec<u32>)> entry size
299            let pos_entry_size = size_of::<DocId>() + vec_overhead;
300            position_index_bytes += pos_builder.postings.capacity() * pos_entry_size;
301        }
302        // HashMap overhead for position_index
303        let pos_index_entry_size =
304            term_key_size + size_of::<PositionPostingListBuilder>() + hashmap_entry_base_overhead;
305        position_index_bytes += self.position_index.len() * pos_index_entry_size;
306
307        let estimated_memory_bytes = postings_bytes
308            + index_overhead_bytes
309            + interner_bytes
310            + field_lengths_bytes
311            + dense_vectors_bytes
312            + local_tf_buffer_bytes
313            + sparse_vectors_bytes
314            + position_index_bytes;
315
316        let memory_breakdown = MemoryBreakdown {
317            postings_bytes,
318            index_overhead_bytes,
319            interner_bytes,
320            field_lengths_bytes,
321            dense_vectors_bytes,
322            dense_vector_count,
323            sparse_vectors_bytes,
324            position_index_bytes,
325        };
326
327        SegmentBuilderStats {
328            num_docs: self.next_doc_id,
329            unique_terms: self.inverted_index.len(),
330            postings_in_memory,
331            interned_strings: self.term_interner.len(),
332            doc_field_lengths_size: self.doc_field_lengths.len(),
333            estimated_memory_bytes,
334            memory_breakdown,
335        }
336    }
337
338    /// Add a document - streams to disk immediately
339    pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
340        let doc_id = self.next_doc_id;
341        self.next_doc_id += 1;
342
343        // Initialize field lengths for this document
344        let base_idx = self.doc_field_lengths.len();
345        self.doc_field_lengths
346            .resize(base_idx + self.num_indexed_fields, 0);
347        self.estimated_memory += self.num_indexed_fields * std::mem::size_of::<u32>();
348
349        // Reset element ordinals for this document (for multi-valued fields)
350        self.current_element_ordinal.clear();
351
352        for (field, value) in doc.field_values() {
353            let Some(entry) = self.schema.get_field_entry(*field) else {
354                continue;
355            };
356
357            // Dense vectors are written to .vectors when indexed || stored
358            // Other field types require indexed
359            if !matches!(&entry.field_type, FieldType::DenseVector) && !entry.indexed {
360                continue;
361            }
362
363            match (&entry.field_type, value) {
364                (FieldType::Text, FieldValue::Text(text)) => {
365                    let element_ordinal = self.next_element_ordinal(field.0);
366                    let token_count =
367                        self.index_text_field(*field, doc_id, text, element_ordinal)?;
368
369                    let stats = self.field_stats.entry(field.0).or_default();
370                    stats.total_tokens += token_count as u64;
371                    if element_ordinal == 0 {
372                        stats.doc_count += 1;
373                    }
374
375                    if let Some(&slot) = self.field_to_slot.get(&field.0) {
376                        self.doc_field_lengths[base_idx + slot] = token_count;
377                    }
378                }
379                (FieldType::U64, FieldValue::U64(v)) => {
380                    self.index_numeric_field(*field, doc_id, *v)?;
381                }
382                (FieldType::I64, FieldValue::I64(v)) => {
383                    self.index_numeric_field(*field, doc_id, *v as u64)?;
384                }
385                (FieldType::F64, FieldValue::F64(v)) => {
386                    self.index_numeric_field(*field, doc_id, v.to_bits())?;
387                }
388                (FieldType::DenseVector, FieldValue::DenseVector(vec))
389                    if entry.indexed || entry.stored =>
390                {
391                    let ordinal = self.next_element_ordinal(field.0);
392                    self.index_dense_vector_field(*field, doc_id, ordinal as u16, vec)?;
393                }
394                (FieldType::SparseVector, FieldValue::SparseVector(entries)) => {
395                    let ordinal = self.next_element_ordinal(field.0);
396                    self.index_sparse_vector_field(*field, doc_id, ordinal as u16, entries)?;
397                }
398                _ => {}
399            }
400        }
401
402        // Stream document to disk immediately
403        self.write_document_to_store(&doc)?;
404
405        Ok(doc_id)
406    }
407
408    /// Index a text field using interned terms
409    ///
410    /// Uses a custom tokenizer when set for the field (via `set_tokenizer`),
411    /// otherwise falls back to an inline zero-allocation path (split_whitespace
412    /// + lowercase + strip non-alphanumeric).
413    ///
414    /// If position recording is enabled for this field, also records token positions
415    /// encoded as (element_ordinal << 20) | token_position.
416    fn index_text_field(
417        &mut self,
418        field: Field,
419        doc_id: DocId,
420        text: &str,
421        element_ordinal: u32,
422    ) -> Result<u32> {
423        use crate::dsl::PositionMode;
424
425        let field_id = field.0;
426        let position_mode = self
427            .position_enabled_fields
428            .get(&field_id)
429            .copied()
430            .flatten();
431
432        // Phase 1: Aggregate term frequencies within this document
433        // Also collect positions if enabled
434        // Reuse buffers to avoid allocations
435        self.local_tf_buffer.clear();
436        // Clear position Vecs in-place (keeps allocated capacity for reuse)
437        for v in self.local_positions.values_mut() {
438            v.clear();
439        }
440
441        let mut token_position = 0u32;
442
443        // Tokenize: use custom tokenizer if set, else inline zero-alloc path.
444        // The owned Vec<Token> is computed first so the immutable borrow of
445        // self.tokenizers ends before we mutate other fields.
446        let custom_tokens = self.tokenizers.get(&field).map(|t| t.tokenize(text));
447
448        if let Some(tokens) = custom_tokens {
449            // Custom tokenizer path
450            for token in &tokens {
451                let is_new_string = !self.term_interner.contains(&token.text);
452                let term_spur = self.term_interner.get_or_intern(&token.text);
453                if is_new_string {
454                    self.estimated_memory += token.text.len() + INTERN_OVERHEAD;
455                }
456                *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
457
458                if let Some(mode) = position_mode {
459                    let encoded_pos = match mode {
460                        PositionMode::Ordinal => element_ordinal << 20,
461                        PositionMode::TokenPosition => token.position,
462                        PositionMode::Full => (element_ordinal << 20) | token.position,
463                    };
464                    self.local_positions
465                        .entry(term_spur)
466                        .or_default()
467                        .push(encoded_pos);
468                }
469            }
470            token_position = tokens.len() as u32;
471        } else {
472            // Inline zero-allocation path: split_whitespace + lowercase + strip non-alphanumeric
473            for word in text.split_whitespace() {
474                self.token_buffer.clear();
475                for c in word.chars() {
476                    if c.is_alphanumeric() {
477                        for lc in c.to_lowercase() {
478                            self.token_buffer.push(lc);
479                        }
480                    }
481                }
482
483                if self.token_buffer.is_empty() {
484                    continue;
485                }
486
487                let is_new_string = !self.term_interner.contains(&self.token_buffer);
488                let term_spur = self.term_interner.get_or_intern(&self.token_buffer);
489                if is_new_string {
490                    self.estimated_memory += self.token_buffer.len() + INTERN_OVERHEAD;
491                }
492                *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
493
494                if let Some(mode) = position_mode {
495                    let encoded_pos = match mode {
496                        PositionMode::Ordinal => element_ordinal << 20,
497                        PositionMode::TokenPosition => token_position,
498                        PositionMode::Full => (element_ordinal << 20) | token_position,
499                    };
500                    self.local_positions
501                        .entry(term_spur)
502                        .or_default()
503                        .push(encoded_pos);
504                }
505
506                token_position += 1;
507            }
508        }
509
510        // Phase 2: Insert aggregated terms into inverted index
511        // Now we only do one inverted_index lookup per unique term in doc
512        for (&term_spur, &tf) in &self.local_tf_buffer {
513            let term_key = TermKey {
514                field: field_id,
515                term: term_spur,
516            };
517
518            let is_new_term = !self.inverted_index.contains_key(&term_key);
519            let posting = self
520                .inverted_index
521                .entry(term_key)
522                .or_insert_with(PostingListBuilder::new);
523            posting.add(doc_id, tf);
524
525            self.estimated_memory += size_of::<CompactPosting>();
526            if is_new_term {
527                self.estimated_memory += NEW_TERM_OVERHEAD;
528            }
529
530            if position_mode.is_some()
531                && let Some(positions) = self.local_positions.get(&term_spur)
532            {
533                let is_new_pos_term = !self.position_index.contains_key(&term_key);
534                let pos_posting = self
535                    .position_index
536                    .entry(term_key)
537                    .or_insert_with(PositionPostingListBuilder::new);
538                for &pos in positions {
539                    pos_posting.add_position(doc_id, pos);
540                }
541                self.estimated_memory += positions.len() * size_of::<u32>();
542                if is_new_pos_term {
543                    self.estimated_memory += NEW_POS_TERM_OVERHEAD;
544                }
545            }
546        }
547
548        Ok(token_position)
549    }
550
551    fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
552        use std::fmt::Write;
553
554        self.numeric_buffer.clear();
555        write!(self.numeric_buffer, "__num_{}", value).unwrap();
556        let is_new_string = !self.term_interner.contains(&self.numeric_buffer);
557        let term_spur = self.term_interner.get_or_intern(&self.numeric_buffer);
558
559        let term_key = TermKey {
560            field: field.0,
561            term: term_spur,
562        };
563
564        let is_new_term = !self.inverted_index.contains_key(&term_key);
565        let posting = self
566            .inverted_index
567            .entry(term_key)
568            .or_insert_with(PostingListBuilder::new);
569        posting.add(doc_id, 1);
570
571        self.estimated_memory += size_of::<CompactPosting>();
572        if is_new_term {
573            self.estimated_memory += NEW_TERM_OVERHEAD;
574        }
575        if is_new_string {
576            self.estimated_memory += self.numeric_buffer.len() + INTERN_OVERHEAD;
577        }
578
579        Ok(())
580    }
581
582    /// Index a dense vector field with ordinal tracking
583    fn index_dense_vector_field(
584        &mut self,
585        field: Field,
586        doc_id: DocId,
587        ordinal: u16,
588        vector: &[f32],
589    ) -> Result<()> {
590        let dim = vector.len();
591
592        let builder = self
593            .dense_vectors
594            .entry(field.0)
595            .or_insert_with(|| DenseVectorBuilder::new(dim));
596
597        // Verify dimension consistency
598        if builder.dim != dim && builder.len() > 0 {
599            return Err(crate::Error::Schema(format!(
600                "Dense vector dimension mismatch: expected {}, got {}",
601                builder.dim, dim
602            )));
603        }
604
605        builder.add(doc_id, ordinal, vector);
606
607        self.estimated_memory += std::mem::size_of_val(vector) + size_of::<(DocId, u16)>();
608
609        Ok(())
610    }
611
612    /// Index a sparse vector field using dedicated sparse posting lists
613    ///
614    /// Collects (doc_id, ordinal, weight) postings per dimension. During commit, these are
615    /// converted to BlockSparsePostingList with proper quantization from SparseVectorConfig.
616    ///
617    /// Weights below the configured `weight_threshold` are not indexed.
618    fn index_sparse_vector_field(
619        &mut self,
620        field: Field,
621        doc_id: DocId,
622        ordinal: u16,
623        entries: &[(u32, f32)],
624    ) -> Result<()> {
625        // Get weight threshold from field config (default 0.0 = no filtering)
626        let weight_threshold = self
627            .schema
628            .get_field_entry(field)
629            .and_then(|entry| entry.sparse_vector_config.as_ref())
630            .map(|config| config.weight_threshold)
631            .unwrap_or(0.0);
632
633        let builder = self
634            .sparse_vectors
635            .entry(field.0)
636            .or_insert_with(SparseVectorBuilder::new);
637
638        for &(dim_id, weight) in entries {
639            // Skip weights below threshold
640            if weight.abs() < weight_threshold {
641                continue;
642            }
643
644            let is_new_dim = !builder.postings.contains_key(&dim_id);
645            builder.add(dim_id, doc_id, ordinal, weight);
646            self.estimated_memory += size_of::<(DocId, u16, f32)>();
647            if is_new_dim {
648                // HashMap entry overhead + Vec header
649                self.estimated_memory += size_of::<u32>() + size_of::<Vec<(DocId, u16, f32)>>() + 8; // 8 = hashmap control byte + padding
650            }
651        }
652
653        Ok(())
654    }
655
656    /// Write document to streaming store (reuses internal buffer to avoid per-doc allocation)
657    fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
658        use byteorder::{LittleEndian, WriteBytesExt};
659
660        super::store::serialize_document_into(doc, &self.schema, &mut self.doc_serialize_buffer)?;
661
662        self.store_file
663            .write_u32::<LittleEndian>(self.doc_serialize_buffer.len() as u32)?;
664        self.store_file.write_all(&self.doc_serialize_buffer)?;
665
666        Ok(())
667    }
668
669    /// Build the final segment
670    ///
671    /// Streams all data directly to disk via StreamingWriter to avoid buffering
672    /// entire serialized outputs in memory. Each phase consumes and drops its
673    /// source data before the next phase begins.
674    pub async fn build<D: Directory + DirectoryWriter>(
675        mut self,
676        dir: &D,
677        segment_id: SegmentId,
678        trained: Option<&super::TrainedVectorStructures>,
679    ) -> Result<SegmentMeta> {
680        // Flush any buffered data
681        self.store_file.flush()?;
682
683        let files = SegmentFiles::new(segment_id.0);
684
685        // Phase 1: Stream positions directly to disk (consumes position_index)
686        let position_index = std::mem::take(&mut self.position_index);
687        let position_offsets = if !position_index.is_empty() {
688            let mut pos_writer = dir.streaming_writer(&files.positions).await?;
689            let offsets = Self::build_positions_streaming(
690                position_index,
691                &self.term_interner,
692                &mut *pos_writer,
693            )?;
694            pos_writer.finish()?;
695            offsets
696        } else {
697            FxHashMap::default()
698        };
699
700        // Phase 2: 4-way parallel build — postings, store, dense vectors, sparse vectors
701        // These are fully independent: different source data, different output files.
702        let inverted_index = std::mem::take(&mut self.inverted_index);
703        let term_interner = std::mem::replace(&mut self.term_interner, Rodeo::new());
704        let store_path = self.store_path.clone();
705        let num_compression_threads = self.config.num_compression_threads;
706        let compression_level = self.config.compression_level;
707        let dense_vectors = std::mem::take(&mut self.dense_vectors);
708        let mut sparse_vectors = std::mem::take(&mut self.sparse_vectors);
709        let schema = &self.schema;
710
711        // Pre-create all streaming writers (async) before entering sync rayon scope
712        let mut term_dict_writer = dir.streaming_writer(&files.term_dict).await?;
713        let mut postings_writer = dir.streaming_writer(&files.postings).await?;
714        let mut store_writer = dir.streaming_writer(&files.store).await?;
715        let mut vectors_writer = if !dense_vectors.is_empty() {
716            Some(dir.streaming_writer(&files.vectors).await?)
717        } else {
718            None
719        };
720        let mut sparse_writer = if !sparse_vectors.is_empty() {
721            Some(dir.streaming_writer(&files.sparse).await?)
722        } else {
723            None
724        };
725
726        let ((postings_result, store_result), (vectors_result, sparse_result)) = rayon::join(
727            || {
728                rayon::join(
729                    || {
730                        Self::build_postings_streaming(
731                            inverted_index,
732                            term_interner,
733                            &position_offsets,
734                            &mut *term_dict_writer,
735                            &mut *postings_writer,
736                        )
737                    },
738                    || {
739                        Self::build_store_streaming(
740                            &store_path,
741                            num_compression_threads,
742                            compression_level,
743                            &mut *store_writer,
744                        )
745                    },
746                )
747            },
748            || {
749                rayon::join(
750                    || -> Result<()> {
751                        if let Some(ref mut w) = vectors_writer {
752                            Self::build_vectors_streaming(
753                                dense_vectors,
754                                schema,
755                                trained,
756                                &mut **w,
757                            )?;
758                        }
759                        Ok(())
760                    },
761                    || -> Result<()> {
762                        if let Some(ref mut w) = sparse_writer {
763                            Self::build_sparse_streaming(&mut sparse_vectors, schema, &mut **w)?;
764                        }
765                        Ok(())
766                    },
767                )
768            },
769        );
770        postings_result?;
771        store_result?;
772        vectors_result?;
773        sparse_result?;
774        term_dict_writer.finish()?;
775        postings_writer.finish()?;
776        store_writer.finish()?;
777        if let Some(w) = vectors_writer {
778            w.finish()?;
779        }
780        if let Some(w) = sparse_writer {
781            w.finish()?;
782        }
783        drop(position_offsets);
784        drop(sparse_vectors);
785
786        let meta = SegmentMeta {
787            id: segment_id.0,
788            num_docs: self.next_doc_id,
789            field_stats: self.field_stats.clone(),
790        };
791
792        dir.write(&files.meta, &meta.serialize()?).await?;
793
794        // Cleanup temp files
795        let _ = std::fs::remove_file(&self.store_path);
796
797        Ok(meta)
798    }
799
800    /// Stream dense vectors directly to disk (zero-buffer for vector data).
801    ///
802    /// Computes sizes deterministically (no trial serialization needed), writes
803    /// a small header, then streams each field's raw f32 data directly to the writer.
804    fn build_vectors_streaming(
805        dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
806        schema: &Schema,
807        trained: Option<&super::TrainedVectorStructures>,
808        writer: &mut dyn Write,
809    ) -> Result<()> {
810        use crate::dsl::{DenseVectorQuantization, VectorIndexType};
811
812        let mut fields: Vec<(u32, DenseVectorBuilder)> = dense_vectors
813            .into_iter()
814            .filter(|(_, b)| b.len() > 0)
815            .collect();
816        fields.sort_by_key(|(id, _)| *id);
817
818        if fields.is_empty() {
819            return Ok(());
820        }
821
822        // Resolve quantization config per field from schema
823        let quants: Vec<DenseVectorQuantization> = fields
824            .iter()
825            .map(|(field_id, _)| {
826                schema
827                    .get_field_entry(Field(*field_id))
828                    .and_then(|e| e.dense_vector_config.as_ref())
829                    .map(|c| c.quantization)
830                    .unwrap_or(DenseVectorQuantization::F32)
831            })
832            .collect();
833
834        // Compute sizes using deterministic formula (no serialization needed)
835        let mut field_sizes: Vec<usize> = Vec::with_capacity(fields.len());
836        for (i, (_field_id, builder)) in fields.iter().enumerate() {
837            field_sizes.push(FlatVectorData::serialized_binary_size(
838                builder.dim,
839                builder.len(),
840                quants[i],
841            ));
842        }
843
844        use crate::segment::format::{DenseVectorTocEntry, write_dense_toc_and_footer};
845
846        // Data-first format: stream field data, then write TOC + footer at end.
847        // Data starts at file offset 0 → mmap page-aligned, no alignment copies.
848        let mut toc: Vec<DenseVectorTocEntry> = Vec::with_capacity(fields.len() * 2);
849        let mut current_offset = 0u64;
850
851        // Pre-build ANN indexes in parallel across fields.
852        // Each field's ANN build is independent (different vectors, different centroids).
853        let ann_blobs: Vec<(u32, u8, Vec<u8>)> = if let Some(trained) = trained {
854            fields
855                .par_iter()
856                .filter_map(|(field_id, builder)| {
857                    let config = schema
858                        .get_field_entry(Field(*field_id))
859                        .and_then(|e| e.dense_vector_config.as_ref())?;
860
861                    let dim = builder.dim;
862                    let blob = match config.index_type {
863                        VectorIndexType::IvfRaBitQ if trained.centroids.contains_key(field_id) => {
864                            let centroids = &trained.centroids[field_id];
865                            let (mut index, codebook) =
866                                super::ann_build::new_ivf_rabitq(dim, centroids);
867                            for (i, (doc_id, ordinal)) in builder.doc_ids.iter().enumerate() {
868                                let v = &builder.vectors[i * dim..(i + 1) * dim];
869                                index.add_vector(centroids, &codebook, *doc_id, *ordinal, v);
870                            }
871                            super::ann_build::serialize_ivf_rabitq(index, codebook)
872                                .map(|b| (super::ann_build::IVF_RABITQ_TYPE, b))
873                        }
874                        VectorIndexType::ScaNN
875                            if trained.centroids.contains_key(field_id)
876                                && trained.codebooks.contains_key(field_id) =>
877                        {
878                            let centroids = &trained.centroids[field_id];
879                            let codebook = &trained.codebooks[field_id];
880                            let mut index = super::ann_build::new_scann(dim, centroids, codebook);
881                            for (i, (doc_id, ordinal)) in builder.doc_ids.iter().enumerate() {
882                                let v = &builder.vectors[i * dim..(i + 1) * dim];
883                                index.add_vector(centroids, codebook, *doc_id, *ordinal, v);
884                            }
885                            super::ann_build::serialize_scann(index, codebook)
886                                .map(|b| (super::ann_build::SCANN_TYPE, b))
887                        }
888                        _ => return None,
889                    };
890                    match blob {
891                        Ok((index_type, bytes)) => {
892                            log::info!(
893                                "[segment_build] built ANN(type={}) for field {} ({} vectors, {} bytes)",
894                                index_type,
895                                field_id,
896                                builder.doc_ids.len(),
897                                bytes.len()
898                            );
899                            Some((*field_id, index_type, bytes))
900                        }
901                        Err(e) => {
902                            log::warn!(
903                                "[segment_build] ANN serialize failed for field {}: {}",
904                                field_id,
905                                e
906                            );
907                            None
908                        }
909                    }
910                })
911                .collect()
912        } else {
913            Vec::new()
914        };
915
916        // Stream each field's flat data directly (builder → disk, no intermediate buffer)
917        for (i, (_field_id, builder)) in fields.into_iter().enumerate() {
918            let data_offset = current_offset;
919            FlatVectorData::serialize_binary_from_flat_streaming(
920                builder.dim,
921                &builder.vectors,
922                &builder.doc_ids,
923                quants[i],
924                writer,
925            )
926            .map_err(crate::Error::Io)?;
927            current_offset += field_sizes[i] as u64;
928            toc.push(DenseVectorTocEntry {
929                field_id: _field_id,
930                index_type: super::ann_build::FLAT_TYPE,
931                offset: data_offset,
932                size: field_sizes[i] as u64,
933            });
934            // Pad to 8-byte boundary so next field's mmap slice is aligned
935            let pad = (8 - (current_offset % 8)) % 8;
936            if pad > 0 {
937                writer.write_all(&[0u8; 8][..pad as usize])?;
938                current_offset += pad;
939            }
940            // builder dropped here, freeing vector memory before next field
941        }
942
943        // Write ANN blob entries after flat entries
944        for (field_id, index_type, blob) in ann_blobs {
945            let data_offset = current_offset;
946            let blob_len = blob.len() as u64;
947            writer.write_all(&blob)?;
948            current_offset += blob_len;
949            toc.push(DenseVectorTocEntry {
950                field_id,
951                index_type,
952                offset: data_offset,
953                size: blob_len,
954            });
955            let pad = (8 - (current_offset % 8)) % 8;
956            if pad > 0 {
957                writer.write_all(&[0u8; 8][..pad as usize])?;
958                current_offset += pad;
959            }
960        }
961
962        // Write TOC + footer
963        write_dense_toc_and_footer(writer, current_offset, &toc)?;
964
965        Ok(())
966    }
967
968    /// Stream sparse vectors directly to disk (footer-based format).
969    ///
970    /// Data is written first (one dim at a time), then the TOC and footer
971    /// are appended. This matches the dense vectors format pattern.
972    fn build_sparse_streaming(
973        sparse_vectors: &mut FxHashMap<u32, SparseVectorBuilder>,
974        schema: &Schema,
975        writer: &mut dyn Write,
976    ) -> Result<()> {
977        use crate::segment::format::{SparseFieldToc, write_sparse_toc_and_footer};
978        use crate::structures::{BlockSparsePostingList, WeightQuantization};
979
980        if sparse_vectors.is_empty() {
981            return Ok(());
982        }
983
984        // Collect and sort fields
985        let mut field_ids: Vec<u32> = sparse_vectors.keys().copied().collect();
986        field_ids.sort_unstable();
987
988        let mut field_tocs: Vec<SparseFieldToc> = Vec::new();
989        let mut current_offset = 0u64;
990
991        for &field_id in &field_ids {
992            let builder = sparse_vectors.get_mut(&field_id).unwrap();
993            if builder.is_empty() {
994                continue;
995            }
996
997            let field = crate::dsl::Field(field_id);
998            let sparse_config = schema
999                .get_field_entry(field)
1000                .and_then(|e| e.sparse_vector_config.as_ref());
1001
1002            let quantization = sparse_config
1003                .map(|c| c.weight_quantization)
1004                .unwrap_or(WeightQuantization::Float32);
1005
1006            let block_size = sparse_config.map(|c| c.block_size).unwrap_or(128);
1007            let pruning_fraction = sparse_config.and_then(|c| c.posting_list_pruning);
1008
1009            // Parallel: sort + prune + serialize each dimension independently,
1010            // then write sequentially. Each dimension's pipeline is CPU-bound
1011            // and fully independent.
1012            let mut dims: Vec<_> = std::mem::take(&mut builder.postings).into_iter().collect();
1013            dims.sort_unstable_by_key(|(id, _)| *id);
1014
1015            let serialized_dims: Vec<(u32, Vec<u8>)> = dims
1016                .into_par_iter()
1017                .map(|(dim_id, mut postings)| {
1018                    postings.sort_unstable_by_key(|(doc_id, ordinal, _)| (*doc_id, *ordinal));
1019
1020                    if let Some(fraction) = pruning_fraction
1021                        && postings.len() > 1
1022                        && fraction < 1.0
1023                    {
1024                        let original_len = postings.len();
1025                        postings.sort_by(|a, b| {
1026                            b.2.abs()
1027                                .partial_cmp(&a.2.abs())
1028                                .unwrap_or(std::cmp::Ordering::Equal)
1029                        });
1030                        let keep = ((original_len as f64 * fraction as f64).ceil() as usize).max(1);
1031                        postings.truncate(keep);
1032                        postings.sort_unstable_by_key(|(d, o, _)| (*d, *o));
1033                    }
1034
1035                    let block_list = BlockSparsePostingList::from_postings_with_block_size(
1036                        &postings,
1037                        quantization,
1038                        block_size,
1039                    )
1040                    .map_err(crate::Error::Io)?;
1041
1042                    let mut buf = Vec::new();
1043                    block_list.serialize(&mut buf).map_err(crate::Error::Io)?;
1044                    Ok((dim_id, buf))
1045                })
1046                .collect::<Result<Vec<_>>>()?;
1047
1048            // Sequential write (preserves deterministic offset tracking)
1049            let mut dim_entries: Vec<(u32, u64, u32)> = Vec::with_capacity(serialized_dims.len());
1050            for (dim_id, buf) in &serialized_dims {
1051                writer.write_all(buf)?;
1052                dim_entries.push((*dim_id, current_offset, buf.len() as u32));
1053                current_offset += buf.len() as u64;
1054            }
1055
1056            if !dim_entries.is_empty() {
1057                field_tocs.push(SparseFieldToc {
1058                    field_id,
1059                    quantization: quantization as u8,
1060                    dims: dim_entries,
1061                });
1062            }
1063        }
1064
1065        if field_tocs.is_empty() {
1066            return Ok(());
1067        }
1068
1069        let toc_offset = current_offset;
1070        write_sparse_toc_and_footer(writer, toc_offset, &field_tocs).map_err(crate::Error::Io)?;
1071
1072        Ok(())
1073    }
1074
1075    /// Stream positions directly to disk, returning only the offset map.
1076    ///
1077    /// Consumes the position_index and writes each position posting list
1078    /// directly to the writer, tracking offsets for the postings phase.
1079    fn build_positions_streaming(
1080        position_index: HashMap<TermKey, PositionPostingListBuilder>,
1081        term_interner: &Rodeo,
1082        writer: &mut dyn Write,
1083    ) -> Result<FxHashMap<Vec<u8>, (u64, u32)>> {
1084        use crate::structures::PositionPostingList;
1085
1086        let mut position_offsets: FxHashMap<Vec<u8>, (u64, u32)> = FxHashMap::default();
1087
1088        // Consume HashMap into Vec for sorting (owned, no borrowing)
1089        let mut entries: Vec<(Vec<u8>, PositionPostingListBuilder)> = position_index
1090            .into_iter()
1091            .map(|(term_key, pos_builder)| {
1092                let term_str = term_interner.resolve(&term_key.term);
1093                let mut key = Vec::with_capacity(size_of::<u32>() + term_str.len());
1094                key.extend_from_slice(&term_key.field.to_le_bytes());
1095                key.extend_from_slice(term_str.as_bytes());
1096                (key, pos_builder)
1097            })
1098            .collect();
1099
1100        entries.sort_by(|a, b| a.0.cmp(&b.0));
1101
1102        let mut current_offset = 0u64;
1103        let mut buf = Vec::new();
1104
1105        for (key, pos_builder) in entries {
1106            let mut pos_list = PositionPostingList::with_capacity(pos_builder.postings.len());
1107            for (doc_id, positions) in pos_builder.postings {
1108                pos_list.push(doc_id, positions);
1109            }
1110
1111            // Serialize to reusable buffer, then write
1112            buf.clear();
1113            pos_list.serialize(&mut buf).map_err(crate::Error::Io)?;
1114            writer.write_all(&buf)?;
1115
1116            position_offsets.insert(key, (current_offset, buf.len() as u32));
1117            current_offset += buf.len() as u64;
1118        }
1119
1120        Ok(position_offsets)
1121    }
1122
1123    /// Stream postings directly to disk.
1124    ///
1125    /// Parallel serialization of posting lists, then sequential streaming of
1126    /// term dict and postings data directly to writers (no Vec<u8> accumulation).
1127    fn build_postings_streaming(
1128        inverted_index: HashMap<TermKey, PostingListBuilder>,
1129        term_interner: Rodeo,
1130        position_offsets: &FxHashMap<Vec<u8>, (u64, u32)>,
1131        term_dict_writer: &mut dyn Write,
1132        postings_writer: &mut dyn Write,
1133    ) -> Result<()> {
1134        // Phase 1: Consume HashMap into sorted Vec (frees HashMap overhead)
1135        let mut term_entries: Vec<(Vec<u8>, PostingListBuilder)> = inverted_index
1136            .into_iter()
1137            .map(|(term_key, posting_list)| {
1138                let term_str = term_interner.resolve(&term_key.term);
1139                let mut key = Vec::with_capacity(4 + term_str.len());
1140                key.extend_from_slice(&term_key.field.to_le_bytes());
1141                key.extend_from_slice(term_str.as_bytes());
1142                (key, posting_list)
1143            })
1144            .collect();
1145
1146        drop(term_interner);
1147
1148        term_entries.par_sort_unstable_by(|a, b| a.0.cmp(&b.0));
1149
1150        // Phase 2: Parallel serialization
1151        // For inline-eligible terms (no positions, few postings), extract doc_ids/tfs
1152        // directly from CompactPosting without creating an intermediate PostingList.
1153        let serialized: Vec<(Vec<u8>, SerializedPosting)> = term_entries
1154            .into_par_iter()
1155            .map(|(key, posting_builder)| {
1156                let has_positions = position_offsets.contains_key(&key);
1157
1158                // Fast path: try inline first (avoids PostingList + BlockPostingList allocs)
1159                // Uses try_inline_iter to avoid allocating two Vec<u32> per term.
1160                if !has_positions
1161                    && let Some(inline) = TermInfo::try_inline_iter(
1162                        posting_builder.postings.len(),
1163                        posting_builder
1164                            .postings
1165                            .iter()
1166                            .map(|p| (p.doc_id, p.term_freq as u32)),
1167                    )
1168                {
1169                    return Ok((key, SerializedPosting::Inline(inline)));
1170                }
1171
1172                // Slow path: build full PostingList → BlockPostingList → serialize
1173                let mut full_postings = PostingList::with_capacity(posting_builder.len());
1174                for p in &posting_builder.postings {
1175                    full_postings.push(p.doc_id, p.term_freq as u32);
1176                }
1177
1178                let mut posting_bytes = Vec::new();
1179                let block_list =
1180                    crate::structures::BlockPostingList::from_posting_list(&full_postings)?;
1181                block_list.serialize(&mut posting_bytes)?;
1182                let result = SerializedPosting::External {
1183                    bytes: posting_bytes,
1184                    doc_count: full_postings.doc_count(),
1185                };
1186
1187                Ok((key, result))
1188            })
1189            .collect::<Result<Vec<_>>>()?;
1190
1191        // Phase 3: Stream directly to writers (no intermediate Vec<u8> accumulation)
1192        let mut postings_offset = 0u64;
1193        let mut writer = SSTableWriter::<_, TermInfo>::new(term_dict_writer);
1194
1195        for (key, serialized_posting) in serialized {
1196            let term_info = match serialized_posting {
1197                SerializedPosting::Inline(info) => info,
1198                SerializedPosting::External { bytes, doc_count } => {
1199                    let posting_len = bytes.len() as u32;
1200                    postings_writer.write_all(&bytes)?;
1201
1202                    let info = if let Some(&(pos_offset, pos_len)) = position_offsets.get(&key) {
1203                        TermInfo::external_with_positions(
1204                            postings_offset,
1205                            posting_len,
1206                            doc_count,
1207                            pos_offset,
1208                            pos_len,
1209                        )
1210                    } else {
1211                        TermInfo::external(postings_offset, posting_len, doc_count)
1212                    };
1213                    postings_offset += posting_len as u64;
1214                    info
1215                }
1216            };
1217
1218            writer.insert(&key, &term_info)?;
1219        }
1220
1221        let _ = writer.finish()?;
1222        Ok(())
1223    }
1224
1225    /// Stream compressed document store directly to disk.
1226    ///
1227    /// Reads pre-serialized document bytes from temp file and passes them
1228    /// directly to the store writer via `store_raw`, avoiding the
1229    /// deserialize→Document→reserialize roundtrip entirely.
1230    fn build_store_streaming(
1231        store_path: &PathBuf,
1232        num_compression_threads: usize,
1233        compression_level: CompressionLevel,
1234        writer: &mut dyn Write,
1235    ) -> Result<()> {
1236        use super::store::EagerParallelStoreWriter;
1237
1238        let file = File::open(store_path)?;
1239        let mmap = unsafe { memmap2::Mmap::map(&file)? };
1240
1241        let mut store_writer = EagerParallelStoreWriter::with_compression_level(
1242            writer,
1243            num_compression_threads,
1244            compression_level,
1245        );
1246
1247        // Stream pre-serialized doc bytes directly — no deserialization needed.
1248        // Temp file format: [doc_len: u32 LE][doc_bytes: doc_len bytes] repeated.
1249        let mut offset = 0usize;
1250        while offset + 4 <= mmap.len() {
1251            let doc_len = u32::from_le_bytes([
1252                mmap[offset],
1253                mmap[offset + 1],
1254                mmap[offset + 2],
1255                mmap[offset + 3],
1256            ]) as usize;
1257            offset += 4;
1258
1259            if offset + doc_len > mmap.len() {
1260                break;
1261            }
1262
1263            let doc_bytes = &mmap[offset..offset + doc_len];
1264            store_writer.store_raw(doc_bytes)?;
1265            offset += doc_len;
1266        }
1267
1268        store_writer.finish()?;
1269        Ok(())
1270    }
1271}
1272
1273impl Drop for SegmentBuilder {
1274    fn drop(&mut self) {
1275        // Cleanup temp files on drop
1276        let _ = std::fs::remove_file(&self.store_path);
1277    }
1278}