Skip to main content

hermes_core/segment/builder/
mod.rs

1//! Streaming segment builder with optimized memory usage
2//!
3//! Key optimizations:
4//! - **String interning**: Terms are interned using `lasso` to avoid repeated allocations
5//! - **hashbrown HashMap**: O(1) average insertion instead of BTreeMap's O(log n)
6//! - **Streaming document store**: Documents written to disk immediately
7//! - **Incremental posting flush**: Large posting lists flushed to temp file
8//! - **Memory-mapped intermediate files**: Reduces memory pressure
9//! - **Arena allocation**: Batch allocations for reduced fragmentation
10
11mod config;
12mod posting;
13mod vectors;
14
15pub use config::{MemoryBreakdown, SegmentBuilderConfig, SegmentBuilderStats};
16
17use std::fs::{File, OpenOptions};
18use std::io::{BufWriter, Write};
19use std::mem::size_of;
20use std::path::PathBuf;
21
22use hashbrown::HashMap;
23use lasso::{Rodeo, Spur};
24use rayon::prelude::*;
25use rustc_hash::FxHashMap;
26
27use crate::compression::CompressionLevel;
28
29use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
30use crate::directories::{Directory, DirectoryWriter};
31use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
32use crate::structures::{PostingList, SSTableWriter, TermInfo};
33use crate::tokenizer::BoxedTokenizer;
34use crate::{DocId, Result};
35
36use posting::{
37    CompactPosting, PositionPostingListBuilder, PostingListBuilder, SerializedPosting, TermKey,
38};
39use vectors::{DenseVectorBuilder, SparseVectorBuilder};
40
41// Re-export from vector_data for backwards compatibility
42pub use super::vector_data::{FlatVectorData, IVFRaBitQIndexData, ScaNNIndexData};
43
44/// Size of the document store buffer before writing to disk
45const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; // 16MB
46
47/// Segment builder with optimized memory usage
48///
49/// Features:
50/// - Streams documents to disk immediately (no in-memory document storage)
51/// - Uses string interning for terms (reduced allocations)
52/// - Uses hashbrown HashMap (faster than BTreeMap)
53pub struct SegmentBuilder {
54    schema: Schema,
55    config: SegmentBuilderConfig,
56    tokenizers: FxHashMap<Field, BoxedTokenizer>,
57
58    /// String interner for terms - O(1) lookup and deduplication
59    term_interner: Rodeo,
60
61    /// Inverted index: term key -> posting list
62    inverted_index: HashMap<TermKey, PostingListBuilder>,
63
64    /// Streaming document store writer
65    store_file: BufWriter<File>,
66    store_path: PathBuf,
67
68    /// Document count
69    next_doc_id: DocId,
70
71    /// Per-field statistics for BM25F
72    field_stats: FxHashMap<u32, FieldStats>,
73
74    /// Per-document field lengths stored compactly
75    /// Uses a flat Vec instead of Vec<HashMap> for better cache locality
76    /// Layout: [doc0_field0_len, doc0_field1_len, ..., doc1_field0_len, ...]
77    doc_field_lengths: Vec<u32>,
78    num_indexed_fields: usize,
79    field_to_slot: FxHashMap<u32, usize>,
80
81    /// Reusable buffer for per-document term frequency aggregation
82    /// Avoids allocating a new hashmap for each document
83    local_tf_buffer: FxHashMap<Spur, u32>,
84
85    /// Reusable buffer for tokenization to avoid per-token String allocations
86    token_buffer: String,
87
88    /// Dense vector storage per field: field -> (doc_ids, vectors)
89    /// Vectors are stored as flat f32 arrays for efficient RaBitQ indexing
90    dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
91
92    /// Sparse vector storage per field: field -> SparseVectorBuilder
93    /// Uses proper BlockSparsePostingList with configurable quantization
94    sparse_vectors: FxHashMap<u32, SparseVectorBuilder>,
95
96    /// Position index for fields with positions enabled
97    /// term key -> position posting list
98    position_index: HashMap<TermKey, PositionPostingListBuilder>,
99
100    /// Fields that have position tracking enabled, with their mode
101    position_enabled_fields: FxHashMap<u32, Option<crate::dsl::PositionMode>>,
102
103    /// Current element ordinal for multi-valued fields (reset per document)
104    current_element_ordinal: FxHashMap<u32, u32>,
105
106    /// Incrementally tracked memory estimate (avoids expensive stats() calls)
107    estimated_memory: usize,
108}
109
110impl SegmentBuilder {
111    /// Create a new segment builder
112    pub fn new(schema: Schema, config: SegmentBuilderConfig) -> Result<Self> {
113        let segment_id = uuid::Uuid::new_v4();
114        let store_path = config
115            .temp_dir
116            .join(format!("hermes_store_{}.tmp", segment_id));
117
118        let store_file = BufWriter::with_capacity(
119            STORE_BUFFER_SIZE,
120            OpenOptions::new()
121                .create(true)
122                .write(true)
123                .truncate(true)
124                .open(&store_path)?,
125        );
126
127        // Count indexed fields for compact field length storage
128        // Also track which fields have position recording enabled
129        let mut num_indexed_fields = 0;
130        let mut field_to_slot = FxHashMap::default();
131        let mut position_enabled_fields = FxHashMap::default();
132        for (field, entry) in schema.fields() {
133            if entry.indexed && matches!(entry.field_type, FieldType::Text) {
134                field_to_slot.insert(field.0, num_indexed_fields);
135                num_indexed_fields += 1;
136                if entry.positions.is_some() {
137                    position_enabled_fields.insert(field.0, entry.positions);
138                }
139            }
140        }
141
142        Ok(Self {
143            schema,
144            tokenizers: FxHashMap::default(),
145            term_interner: Rodeo::new(),
146            inverted_index: HashMap::with_capacity(config.posting_map_capacity),
147            store_file,
148            store_path,
149            next_doc_id: 0,
150            field_stats: FxHashMap::default(),
151            doc_field_lengths: Vec::new(),
152            num_indexed_fields,
153            field_to_slot,
154            local_tf_buffer: FxHashMap::default(),
155            token_buffer: String::with_capacity(64),
156            config,
157            dense_vectors: FxHashMap::default(),
158            sparse_vectors: FxHashMap::default(),
159            position_index: HashMap::new(),
160            position_enabled_fields,
161            current_element_ordinal: FxHashMap::default(),
162            estimated_memory: 0,
163        })
164    }
165
166    pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
167        self.tokenizers.insert(field, tokenizer);
168    }
169
170    pub fn num_docs(&self) -> u32 {
171        self.next_doc_id
172    }
173
174    /// Fast O(1) memory estimate - updated incrementally during indexing
175    #[inline]
176    pub fn estimated_memory_bytes(&self) -> usize {
177        self.estimated_memory
178    }
179
180    /// Recalibrate incremental memory estimate using capacity-based calculation.
181    /// More expensive than estimated_memory_bytes() — O(terms + dims) vs O(1) —
182    /// but accounts for Vec capacity growth (doubling) and HashMap table overhead.
183    /// Call periodically (e.g. every 1000 docs) to prevent drift.
184    pub fn recalibrate_memory(&mut self) {
185        self.estimated_memory = self.stats().estimated_memory_bytes;
186    }
187
188    /// Count total unique sparse dimensions across all fields
189    pub fn sparse_dim_count(&self) -> usize {
190        self.sparse_vectors.values().map(|b| b.postings.len()).sum()
191    }
192
193    /// Get current statistics for debugging performance (expensive - iterates all data)
194    pub fn stats(&self) -> SegmentBuilderStats {
195        use std::mem::size_of;
196
197        let postings_in_memory: usize =
198            self.inverted_index.values().map(|p| p.postings.len()).sum();
199
200        // Size constants computed from actual types
201        let compact_posting_size = size_of::<CompactPosting>();
202        let vec_overhead = size_of::<Vec<u8>>(); // Vec header: ptr + len + cap = 24 bytes on 64-bit
203        let term_key_size = size_of::<TermKey>();
204        let posting_builder_size = size_of::<PostingListBuilder>();
205        let spur_size = size_of::<lasso::Spur>();
206        let sparse_entry_size = size_of::<(DocId, u16, f32)>();
207
208        // hashbrown HashMap entry overhead: key + value + 1 byte control + padding
209        // Measured: ~(key_size + value_size + 8) per entry on average
210        let hashmap_entry_base_overhead = 8usize;
211
212        // FxHashMap uses same layout as hashbrown
213        let fxhashmap_entry_overhead = hashmap_entry_base_overhead;
214
215        // Postings memory
216        let postings_bytes: usize = self
217            .inverted_index
218            .values()
219            .map(|p| p.postings.capacity() * compact_posting_size + vec_overhead)
220            .sum();
221
222        // Inverted index overhead
223        let index_overhead_bytes = self.inverted_index.len()
224            * (term_key_size + posting_builder_size + hashmap_entry_base_overhead);
225
226        // Term interner: Rodeo stores strings + metadata
227        // Rodeo internal: string bytes + Spur + arena overhead (~2 pointers per string)
228        let interner_arena_overhead = 2 * size_of::<usize>();
229        let avg_term_len = 8; // Estimated average term length
230        let interner_bytes =
231            self.term_interner.len() * (avg_term_len + spur_size + interner_arena_overhead);
232
233        // Doc field lengths
234        let field_lengths_bytes =
235            self.doc_field_lengths.capacity() * size_of::<u32>() + vec_overhead;
236
237        // Dense vectors
238        let mut dense_vectors_bytes: usize = 0;
239        let mut dense_vector_count: usize = 0;
240        let doc_id_ordinal_size = size_of::<(DocId, u16)>();
241        for b in self.dense_vectors.values() {
242            dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
243                + b.doc_ids.capacity() * doc_id_ordinal_size
244                + 2 * vec_overhead; // Two Vecs
245            dense_vector_count += b.doc_ids.len();
246        }
247
248        // Local buffers
249        let local_tf_entry_size = spur_size + size_of::<u32>() + fxhashmap_entry_overhead;
250        let local_tf_buffer_bytes = self.local_tf_buffer.capacity() * local_tf_entry_size;
251
252        // Sparse vectors
253        let mut sparse_vectors_bytes: usize = 0;
254        for builder in self.sparse_vectors.values() {
255            for postings in builder.postings.values() {
256                sparse_vectors_bytes += postings.capacity() * sparse_entry_size + vec_overhead;
257            }
258            // Inner FxHashMap overhead: u32 key + Vec value ptr + overhead
259            let inner_entry_size = size_of::<u32>() + vec_overhead + fxhashmap_entry_overhead;
260            sparse_vectors_bytes += builder.postings.len() * inner_entry_size;
261        }
262        // Outer FxHashMap overhead
263        let outer_sparse_entry_size =
264            size_of::<u32>() + size_of::<SparseVectorBuilder>() + fxhashmap_entry_overhead;
265        sparse_vectors_bytes += self.sparse_vectors.len() * outer_sparse_entry_size;
266
267        // Position index
268        let mut position_index_bytes: usize = 0;
269        for pos_builder in self.position_index.values() {
270            for (_, positions) in &pos_builder.postings {
271                position_index_bytes += positions.capacity() * size_of::<u32>() + vec_overhead;
272            }
273            // Vec<(DocId, Vec<u32>)> entry size
274            let pos_entry_size = size_of::<DocId>() + vec_overhead;
275            position_index_bytes += pos_builder.postings.capacity() * pos_entry_size;
276        }
277        // HashMap overhead for position_index
278        let pos_index_entry_size =
279            term_key_size + size_of::<PositionPostingListBuilder>() + hashmap_entry_base_overhead;
280        position_index_bytes += self.position_index.len() * pos_index_entry_size;
281
282        let estimated_memory_bytes = postings_bytes
283            + index_overhead_bytes
284            + interner_bytes
285            + field_lengths_bytes
286            + dense_vectors_bytes
287            + local_tf_buffer_bytes
288            + sparse_vectors_bytes
289            + position_index_bytes;
290
291        let memory_breakdown = MemoryBreakdown {
292            postings_bytes,
293            index_overhead_bytes,
294            interner_bytes,
295            field_lengths_bytes,
296            dense_vectors_bytes,
297            dense_vector_count,
298            sparse_vectors_bytes,
299            position_index_bytes,
300        };
301
302        SegmentBuilderStats {
303            num_docs: self.next_doc_id,
304            unique_terms: self.inverted_index.len(),
305            postings_in_memory,
306            interned_strings: self.term_interner.len(),
307            doc_field_lengths_size: self.doc_field_lengths.len(),
308            estimated_memory_bytes,
309            memory_breakdown,
310        }
311    }
312
313    /// Add a document - streams to disk immediately
314    pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
315        let doc_id = self.next_doc_id;
316        self.next_doc_id += 1;
317
318        // Initialize field lengths for this document
319        let base_idx = self.doc_field_lengths.len();
320        self.doc_field_lengths
321            .resize(base_idx + self.num_indexed_fields, 0);
322        self.estimated_memory += self.num_indexed_fields * std::mem::size_of::<u32>();
323
324        // Reset element ordinals for this document (for multi-valued fields)
325        self.current_element_ordinal.clear();
326
327        for (field, value) in doc.field_values() {
328            let entry = self.schema.get_field_entry(*field);
329            if entry.is_none() || !entry.unwrap().indexed {
330                continue;
331            }
332
333            let entry = entry.unwrap();
334            match (&entry.field_type, value) {
335                (FieldType::Text, FieldValue::Text(text)) => {
336                    // Get current element ordinal for multi-valued fields
337                    let element_ordinal = *self.current_element_ordinal.get(&field.0).unwrap_or(&0);
338                    let token_count =
339                        self.index_text_field(*field, doc_id, text, element_ordinal)?;
340                    // Increment element ordinal for next value of this field
341                    *self.current_element_ordinal.entry(field.0).or_insert(0) += 1;
342
343                    // Update field statistics
344                    let stats = self.field_stats.entry(field.0).or_default();
345                    stats.total_tokens += token_count as u64;
346                    // Only count each document once, even for multi-value fields
347                    if element_ordinal == 0 {
348                        stats.doc_count += 1;
349                    }
350
351                    // Store field length compactly
352                    if let Some(&slot) = self.field_to_slot.get(&field.0) {
353                        self.doc_field_lengths[base_idx + slot] = token_count;
354                    }
355                }
356                (FieldType::U64, FieldValue::U64(v)) => {
357                    self.index_numeric_field(*field, doc_id, *v)?;
358                }
359                (FieldType::I64, FieldValue::I64(v)) => {
360                    self.index_numeric_field(*field, doc_id, *v as u64)?;
361                }
362                (FieldType::F64, FieldValue::F64(v)) => {
363                    self.index_numeric_field(*field, doc_id, v.to_bits())?;
364                }
365                (FieldType::DenseVector, FieldValue::DenseVector(vec)) => {
366                    // Get current element ordinal for multi-valued fields
367                    let element_ordinal = *self.current_element_ordinal.get(&field.0).unwrap_or(&0);
368                    self.index_dense_vector_field(*field, doc_id, element_ordinal as u16, vec)?;
369                    // Increment element ordinal for next value of this field
370                    *self.current_element_ordinal.entry(field.0).or_insert(0) += 1;
371                }
372                (FieldType::SparseVector, FieldValue::SparseVector(entries)) => {
373                    // Get current element ordinal for multi-valued fields
374                    let element_ordinal = *self.current_element_ordinal.get(&field.0).unwrap_or(&0);
375                    self.index_sparse_vector_field(
376                        *field,
377                        doc_id,
378                        element_ordinal as u16,
379                        entries,
380                    )?;
381                    // Increment element ordinal for next value of this field
382                    *self.current_element_ordinal.entry(field.0).or_insert(0) += 1;
383                }
384                _ => {}
385            }
386        }
387
388        // Stream document to disk immediately
389        self.write_document_to_store(&doc)?;
390
391        Ok(doc_id)
392    }
393
394    /// Index a text field using interned terms
395    ///
396    /// Optimization: Zero-allocation inline tokenization + term frequency aggregation.
397    /// Instead of allocating a String per token, we:
398    /// 1. Iterate over whitespace-split words
399    /// 2. Build lowercase token in a reusable buffer
400    /// 3. Intern directly from the buffer
401    ///
402    /// If position recording is enabled for this field, also records token positions
403    /// encoded as (element_ordinal << 20) | token_position.
404    fn index_text_field(
405        &mut self,
406        field: Field,
407        doc_id: DocId,
408        text: &str,
409        element_ordinal: u32,
410    ) -> Result<u32> {
411        use crate::dsl::PositionMode;
412
413        let field_id = field.0;
414        let position_mode = self
415            .position_enabled_fields
416            .get(&field_id)
417            .copied()
418            .flatten();
419
420        // Phase 1: Aggregate term frequencies within this document
421        // Also collect positions if enabled
422        // Reuse buffer to avoid allocations
423        self.local_tf_buffer.clear();
424
425        // For position tracking: term -> list of positions in this text
426        let mut local_positions: FxHashMap<Spur, Vec<u32>> = FxHashMap::default();
427
428        let mut token_position = 0u32;
429
430        // Zero-allocation tokenization: iterate words, lowercase inline, intern directly
431        for word in text.split_whitespace() {
432            // Build lowercase token in reusable buffer
433            self.token_buffer.clear();
434            for c in word.chars() {
435                if c.is_alphanumeric() {
436                    for lc in c.to_lowercase() {
437                        self.token_buffer.push(lc);
438                    }
439                }
440            }
441
442            if self.token_buffer.is_empty() {
443                continue;
444            }
445
446            // Intern the term directly from buffer - O(1) amortized
447            let is_new_string = !self.term_interner.contains(&self.token_buffer);
448            let term_spur = self.term_interner.get_or_intern(&self.token_buffer);
449            if is_new_string {
450                use std::mem::size_of;
451                // string bytes + Spur + arena overhead (2 pointers)
452                self.estimated_memory +=
453                    self.token_buffer.len() + size_of::<Spur>() + 2 * size_of::<usize>();
454            }
455            *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
456
457            // Record position based on mode
458            if let Some(mode) = position_mode {
459                let encoded_pos = match mode {
460                    // Ordinal only: just store element ordinal (token position = 0)
461                    PositionMode::Ordinal => element_ordinal << 20,
462                    // Token position only: just store token position (ordinal = 0)
463                    PositionMode::TokenPosition => token_position,
464                    // Full: encode both
465                    PositionMode::Full => (element_ordinal << 20) | token_position,
466                };
467                local_positions
468                    .entry(term_spur)
469                    .or_default()
470                    .push(encoded_pos);
471            }
472
473            token_position += 1;
474        }
475
476        // Phase 2: Insert aggregated terms into inverted index
477        // Now we only do one inverted_index lookup per unique term in doc
478        for (&term_spur, &tf) in &self.local_tf_buffer {
479            let term_key = TermKey {
480                field: field_id,
481                term: term_spur,
482            };
483
484            let is_new_term = !self.inverted_index.contains_key(&term_key);
485            let posting = self
486                .inverted_index
487                .entry(term_key)
488                .or_insert_with(PostingListBuilder::new);
489            posting.add(doc_id, tf);
490
491            // Incremental memory tracking
492            use std::mem::size_of;
493            self.estimated_memory += size_of::<CompactPosting>();
494            if is_new_term {
495                // HashMap entry overhead + PostingListBuilder + Vec header
496                self.estimated_memory +=
497                    size_of::<TermKey>() + size_of::<PostingListBuilder>() + 24;
498            }
499
500            // Add positions if enabled
501            if position_mode.is_some()
502                && let Some(positions) = local_positions.get(&term_spur)
503            {
504                let is_new_pos_term = !self.position_index.contains_key(&term_key);
505                let pos_posting = self
506                    .position_index
507                    .entry(term_key)
508                    .or_insert_with(PositionPostingListBuilder::new);
509                for &pos in positions {
510                    pos_posting.add_position(doc_id, pos);
511                }
512                // Incremental memory tracking for position index
513                self.estimated_memory += positions.len() * size_of::<u32>();
514                if is_new_pos_term {
515                    self.estimated_memory +=
516                        size_of::<TermKey>() + size_of::<PositionPostingListBuilder>() + 24;
517                }
518            }
519        }
520
521        Ok(token_position)
522    }
523
524    fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
525        use std::mem::size_of;
526
527        // For numeric fields, we use a special encoding
528        let term_str = format!("__num_{}", value);
529        let is_new_string = !self.term_interner.contains(&term_str);
530        let term_spur = self.term_interner.get_or_intern(&term_str);
531
532        let term_key = TermKey {
533            field: field.0,
534            term: term_spur,
535        };
536
537        let is_new_term = !self.inverted_index.contains_key(&term_key);
538        let posting = self
539            .inverted_index
540            .entry(term_key)
541            .or_insert_with(PostingListBuilder::new);
542        posting.add(doc_id, 1);
543
544        // Incremental memory tracking
545        self.estimated_memory += size_of::<CompactPosting>();
546        if is_new_term {
547            self.estimated_memory += size_of::<TermKey>() + size_of::<PostingListBuilder>() + 24;
548        }
549        if is_new_string {
550            self.estimated_memory += term_str.len() + size_of::<Spur>() + 2 * size_of::<usize>();
551        }
552
553        Ok(())
554    }
555
556    /// Index a dense vector field with ordinal tracking
557    fn index_dense_vector_field(
558        &mut self,
559        field: Field,
560        doc_id: DocId,
561        ordinal: u16,
562        vector: &[f32],
563    ) -> Result<()> {
564        let dim = vector.len();
565
566        let builder = self
567            .dense_vectors
568            .entry(field.0)
569            .or_insert_with(|| DenseVectorBuilder::new(dim));
570
571        // Verify dimension consistency
572        if builder.dim != dim && builder.len() > 0 {
573            return Err(crate::Error::Schema(format!(
574                "Dense vector dimension mismatch: expected {}, got {}",
575                builder.dim, dim
576            )));
577        }
578
579        builder.add(doc_id, ordinal, vector);
580
581        // Incremental memory tracking
582        use std::mem::{size_of, size_of_val};
583        self.estimated_memory += size_of_val(vector) + size_of::<(DocId, u16)>();
584
585        Ok(())
586    }
587
588    /// Index a sparse vector field using dedicated sparse posting lists
589    ///
590    /// Collects (doc_id, ordinal, weight) postings per dimension. During commit, these are
591    /// converted to BlockSparsePostingList with proper quantization from SparseVectorConfig.
592    ///
593    /// Weights below the configured `weight_threshold` are not indexed.
594    fn index_sparse_vector_field(
595        &mut self,
596        field: Field,
597        doc_id: DocId,
598        ordinal: u16,
599        entries: &[(u32, f32)],
600    ) -> Result<()> {
601        // Get weight threshold from field config (default 0.0 = no filtering)
602        let weight_threshold = self
603            .schema
604            .get_field_entry(field)
605            .and_then(|entry| entry.sparse_vector_config.as_ref())
606            .map(|config| config.weight_threshold)
607            .unwrap_or(0.0);
608
609        let builder = self
610            .sparse_vectors
611            .entry(field.0)
612            .or_insert_with(SparseVectorBuilder::new);
613
614        for &(dim_id, weight) in entries {
615            // Skip weights below threshold
616            if weight.abs() < weight_threshold {
617                continue;
618            }
619
620            // Incremental memory tracking
621            use std::mem::size_of;
622            let is_new_dim = !builder.postings.contains_key(&dim_id);
623            builder.add(dim_id, doc_id, ordinal, weight);
624            self.estimated_memory += size_of::<(DocId, u16, f32)>();
625            if is_new_dim {
626                // HashMap entry overhead + Vec header
627                self.estimated_memory += size_of::<u32>() + size_of::<Vec<(DocId, u16, f32)>>() + 8; // 8 = hashmap control byte + padding
628            }
629        }
630
631        Ok(())
632    }
633
634    /// Write document to streaming store
635    fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
636        use byteorder::{LittleEndian, WriteBytesExt};
637
638        let doc_bytes = super::store::serialize_document(doc, &self.schema)?;
639
640        self.store_file
641            .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
642        self.store_file.write_all(&doc_bytes)?;
643
644        Ok(())
645    }
646
647    /// Build the final segment
648    ///
649    /// Streams all data directly to disk via StreamingWriter to avoid buffering
650    /// entire serialized outputs in memory. Each phase consumes and drops its
651    /// source data before the next phase begins.
652    pub async fn build<D: Directory + DirectoryWriter>(
653        mut self,
654        dir: &D,
655        segment_id: SegmentId,
656    ) -> Result<SegmentMeta> {
657        // Flush any buffered data
658        self.store_file.flush()?;
659
660        let files = SegmentFiles::new(segment_id.0);
661
662        // Phase 1: Stream positions directly to disk (consumes position_index)
663        let position_index = std::mem::take(&mut self.position_index);
664        let position_offsets = if !position_index.is_empty() {
665            let mut pos_writer = dir.streaming_writer(&files.positions).await?;
666            let offsets = Self::build_positions_streaming(
667                position_index,
668                &self.term_interner,
669                &mut *pos_writer,
670            )?;
671            pos_writer.finish()?;
672            offsets
673        } else {
674            FxHashMap::default()
675        };
676
677        // Phase 2: Build postings + store — stream directly to disk
678        let inverted_index = std::mem::take(&mut self.inverted_index);
679        let term_interner = std::mem::replace(&mut self.term_interner, Rodeo::new());
680        let store_path = self.store_path.clone();
681        let schema_clone = self.schema.clone();
682        let num_compression_threads = self.config.num_compression_threads;
683        let compression_level = self.config.compression_level;
684
685        // Create streaming writers (async) before entering sync build phases
686        let mut term_dict_writer = dir.streaming_writer(&files.term_dict).await?;
687        let mut postings_writer = dir.streaming_writer(&files.postings).await?;
688        let mut store_writer = dir.streaming_writer(&files.store).await?;
689
690        let (postings_result, store_result) = rayon::join(
691            || {
692                Self::build_postings_streaming(
693                    inverted_index,
694                    term_interner,
695                    &position_offsets,
696                    &mut *term_dict_writer,
697                    &mut *postings_writer,
698                )
699            },
700            || {
701                Self::build_store_streaming(
702                    &store_path,
703                    &schema_clone,
704                    num_compression_threads,
705                    compression_level,
706                    &mut *store_writer,
707                )
708            },
709        );
710        postings_result?;
711        store_result?;
712        term_dict_writer.finish()?;
713        postings_writer.finish()?;
714        store_writer.finish()?;
715        drop(position_offsets);
716
717        // Phase 3: Dense vectors — stream directly to disk
718        let dense_vectors = std::mem::take(&mut self.dense_vectors);
719        if !dense_vectors.is_empty() {
720            let mut writer = dir.streaming_writer(&files.vectors).await?;
721            Self::build_vectors_streaming(dense_vectors, &self.schema, &mut *writer)?;
722            writer.finish()?;
723        }
724
725        // Phase 4: Sparse vectors — stream directly to disk
726        let mut sparse_vectors = std::mem::take(&mut self.sparse_vectors);
727        if !sparse_vectors.is_empty() {
728            let mut writer = dir.streaming_writer(&files.sparse).await?;
729            Self::build_sparse_streaming(&mut sparse_vectors, &self.schema, &mut *writer)?;
730            drop(sparse_vectors);
731            writer.finish()?;
732        }
733
734        let meta = SegmentMeta {
735            id: segment_id.0,
736            num_docs: self.next_doc_id,
737            field_stats: self.field_stats.clone(),
738        };
739
740        dir.write(&files.meta, &meta.serialize()?).await?;
741
742        // Cleanup temp files
743        let _ = std::fs::remove_file(&self.store_path);
744
745        Ok(meta)
746    }
747
748    /// Stream dense vectors directly to disk (zero-buffer for vector data).
749    ///
750    /// Computes sizes deterministically (no trial serialization needed), writes
751    /// a small header, then streams each field's raw f32 data directly to the writer.
752    fn build_vectors_streaming(
753        dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
754        schema: &Schema,
755        writer: &mut dyn Write,
756    ) -> Result<()> {
757        use byteorder::{LittleEndian, WriteBytesExt};
758
759        let mut fields: Vec<(u32, DenseVectorBuilder)> = dense_vectors
760            .into_iter()
761            .filter(|(_, b)| b.len() > 0)
762            .collect();
763        fields.sort_by_key(|(id, _)| *id);
764
765        if fields.is_empty() {
766            return Ok(());
767        }
768
769        // Compute sizes using deterministic formula (no serialization needed)
770        let mut field_sizes: Vec<usize> = Vec::with_capacity(fields.len());
771        for (field_id, builder) in &fields {
772            let field = crate::dsl::Field(*field_id);
773            let dense_config = schema
774                .get_field_entry(field)
775                .and_then(|e| e.dense_vector_config.as_ref());
776            let index_dim = dense_config.map(|c| c.index_dim()).unwrap_or(builder.dim);
777            field_sizes.push(FlatVectorData::serialized_binary_size(
778                index_dim,
779                builder.len(),
780            ));
781        }
782
783        // Write header (tiny — ~25 bytes per field)
784        // num_fields(u32) + per-field: field_id(u32) + index_type(u8) + offset(u64) + length(u64)
785        let per_field_entry =
786            size_of::<u32>() + size_of::<u8>() + size_of::<u64>() + size_of::<u64>();
787        let header_size = size_of::<u32>() + fields.len() * per_field_entry;
788        let mut header = Vec::with_capacity(header_size);
789        header.write_u32::<LittleEndian>(fields.len() as u32)?;
790
791        let mut current_offset = header_size as u64;
792        for (i, (field_id, _)) in fields.iter().enumerate() {
793            header.write_u32::<LittleEndian>(*field_id)?;
794            const FLAT_BINARY_INDEX_TYPE: u8 = 4;
795            header.write_u8(FLAT_BINARY_INDEX_TYPE)?;
796            header.write_u64::<LittleEndian>(current_offset)?;
797            header.write_u64::<LittleEndian>(field_sizes[i] as u64)?;
798            current_offset += field_sizes[i] as u64;
799        }
800        writer.write_all(&header)?;
801
802        // Stream each field's data directly (builder → disk, no intermediate buffer)
803        for (field_id, builder) in fields {
804            let field = crate::dsl::Field(field_id);
805            let dense_config = schema
806                .get_field_entry(field)
807                .and_then(|e| e.dense_vector_config.as_ref());
808            let index_dim = dense_config.map(|c| c.index_dim()).unwrap_or(builder.dim);
809
810            FlatVectorData::serialize_binary_from_flat_streaming(
811                index_dim,
812                &builder.vectors,
813                builder.dim,
814                &builder.doc_ids,
815                writer,
816            )
817            .map_err(crate::Error::Io)?;
818            // builder dropped here, freeing vector memory before next field
819        }
820
821        Ok(())
822    }
823
824    /// Stream sparse vectors directly to disk.
825    ///
826    /// Serializes per-dimension posting lists (needed for header offset computation),
827    /// writes header, then streams each dimension's data directly to the writer.
828    /// Eliminates the triple-buffered approach (dim_bytes + all_data + output).
829    fn build_sparse_streaming(
830        sparse_vectors: &mut FxHashMap<u32, SparseVectorBuilder>,
831        schema: &Schema,
832        writer: &mut dyn Write,
833    ) -> Result<()> {
834        use crate::structures::{BlockSparsePostingList, WeightQuantization};
835        use byteorder::{LittleEndian, WriteBytesExt};
836
837        if sparse_vectors.is_empty() {
838            return Ok(());
839        }
840
841        // Phase 1: Serialize all dims, keep bytes for offset computation
842        type DimEntry = (u32, Vec<u8>);
843        type FieldEntry = (u32, WeightQuantization, Vec<DimEntry>);
844        let mut field_data: Vec<FieldEntry> = Vec::new();
845
846        for (&field_id, builder) in sparse_vectors.iter_mut() {
847            if builder.is_empty() {
848                continue;
849            }
850
851            let field = crate::dsl::Field(field_id);
852            let sparse_config = schema
853                .get_field_entry(field)
854                .and_then(|e| e.sparse_vector_config.as_ref());
855
856            let quantization = sparse_config
857                .map(|c| c.weight_quantization)
858                .unwrap_or(WeightQuantization::Float32);
859
860            let block_size = sparse_config.map(|c| c.block_size).unwrap_or(128);
861            let pruning_fraction = sparse_config.and_then(|c| c.posting_list_pruning);
862
863            let mut dims: Vec<DimEntry> = Vec::new();
864
865            for (&dim_id, postings) in builder.postings.iter_mut() {
866                postings.sort_unstable_by_key(|(doc_id, ordinal, _)| (*doc_id, *ordinal));
867
868                if let Some(fraction) = pruning_fraction
869                    && postings.len() > 1
870                    && fraction < 1.0
871                {
872                    let original_len = postings.len();
873                    postings.sort_by(|a, b| {
874                        b.2.abs()
875                            .partial_cmp(&a.2.abs())
876                            .unwrap_or(std::cmp::Ordering::Equal)
877                    });
878                    let keep = ((original_len as f64 * fraction as f64).ceil() as usize).max(1);
879                    postings.truncate(keep);
880                    postings.sort_unstable_by_key(|(d, o, _)| (*d, *o));
881                }
882
883                let block_list = BlockSparsePostingList::from_postings_with_block_size(
884                    postings,
885                    quantization,
886                    block_size,
887                )
888                .map_err(crate::Error::Io)?;
889
890                let mut bytes = Vec::new();
891                block_list.serialize(&mut bytes).map_err(crate::Error::Io)?;
892                dims.push((dim_id, bytes));
893            }
894
895            dims.sort_by_key(|(id, _)| *id);
896            field_data.push((field_id, quantization, dims));
897        }
898
899        if field_data.is_empty() {
900            return Ok(());
901        }
902
903        field_data.sort_by_key(|(id, _, _)| *id);
904
905        // Phase 2: Compute header size and offsets
906        // num_fields(u32) + per-field: field_id(u32) + quantization(u8) + num_dims(u32)
907        // per-dim: dim_id(u32) + offset(u64) + length(u32)
908        let per_dim_entry = size_of::<u32>() + size_of::<u64>() + size_of::<u32>();
909        let per_field_header = size_of::<u32>() + size_of::<u8>() + size_of::<u32>();
910        let mut header_size = size_of::<u32>() as u64;
911        for (_, _, dims) in &field_data {
912            header_size += per_field_header as u64;
913            header_size += (dims.len() as u64) * per_dim_entry as u64;
914        }
915
916        let mut header = Vec::with_capacity(header_size as usize);
917        header.write_u32::<LittleEndian>(field_data.len() as u32)?;
918
919        let mut current_offset = header_size;
920        for (field_id, quantization, dims) in &field_data {
921            header.write_u32::<LittleEndian>(*field_id)?;
922            header.write_u8(*quantization as u8)?;
923            header.write_u32::<LittleEndian>(dims.len() as u32)?;
924
925            for (dim_id, bytes) in dims {
926                header.write_u32::<LittleEndian>(*dim_id)?;
927                header.write_u64::<LittleEndian>(current_offset)?;
928                header.write_u32::<LittleEndian>(bytes.len() as u32)?;
929                current_offset += bytes.len() as u64;
930            }
931        }
932
933        // Phase 3: Write header, then stream each dimension's data
934        writer.write_all(&header)?;
935
936        for (_, _, dims) in field_data {
937            for (_, bytes) in dims {
938                writer.write_all(&bytes)?;
939                // bytes dropped here — one dim at a time
940            }
941        }
942
943        Ok(())
944    }
945
946    /// Stream positions directly to disk, returning only the offset map.
947    ///
948    /// Consumes the position_index and writes each position posting list
949    /// directly to the writer, tracking offsets for the postings phase.
950    fn build_positions_streaming(
951        position_index: HashMap<TermKey, PositionPostingListBuilder>,
952        term_interner: &Rodeo,
953        writer: &mut dyn Write,
954    ) -> Result<FxHashMap<Vec<u8>, (u64, u32)>> {
955        use crate::structures::PositionPostingList;
956
957        let mut position_offsets: FxHashMap<Vec<u8>, (u64, u32)> = FxHashMap::default();
958
959        // Consume HashMap into Vec for sorting (owned, no borrowing)
960        let mut entries: Vec<(Vec<u8>, PositionPostingListBuilder)> = position_index
961            .into_iter()
962            .map(|(term_key, pos_builder)| {
963                let term_str = term_interner.resolve(&term_key.term);
964                let mut key = Vec::with_capacity(size_of::<u32>() + term_str.len());
965                key.extend_from_slice(&term_key.field.to_le_bytes());
966                key.extend_from_slice(term_str.as_bytes());
967                (key, pos_builder)
968            })
969            .collect();
970
971        entries.sort_by(|a, b| a.0.cmp(&b.0));
972
973        let mut current_offset = 0u64;
974
975        for (key, pos_builder) in entries {
976            let mut pos_list = PositionPostingList::with_capacity(pos_builder.postings.len());
977            for (doc_id, positions) in pos_builder.postings {
978                pos_list.push(doc_id, positions);
979            }
980
981            // Serialize to a temp buffer to get exact length, then write
982            let mut buf = Vec::new();
983            pos_list.serialize(&mut buf).map_err(crate::Error::Io)?;
984            writer.write_all(&buf)?;
985
986            position_offsets.insert(key, (current_offset, buf.len() as u32));
987            current_offset += buf.len() as u64;
988        }
989
990        Ok(position_offsets)
991    }
992
993    /// Stream postings directly to disk.
994    ///
995    /// Parallel serialization of posting lists, then sequential streaming of
996    /// term dict and postings data directly to writers (no Vec<u8> accumulation).
997    fn build_postings_streaming(
998        inverted_index: HashMap<TermKey, PostingListBuilder>,
999        term_interner: Rodeo,
1000        position_offsets: &FxHashMap<Vec<u8>, (u64, u32)>,
1001        term_dict_writer: &mut dyn Write,
1002        postings_writer: &mut dyn Write,
1003    ) -> Result<()> {
1004        // Phase 1: Consume HashMap into sorted Vec (frees HashMap overhead)
1005        let mut term_entries: Vec<(Vec<u8>, PostingListBuilder)> = inverted_index
1006            .into_iter()
1007            .map(|(term_key, posting_list)| {
1008                let term_str = term_interner.resolve(&term_key.term);
1009                let mut key = Vec::with_capacity(4 + term_str.len());
1010                key.extend_from_slice(&term_key.field.to_le_bytes());
1011                key.extend_from_slice(term_str.as_bytes());
1012                (key, posting_list)
1013            })
1014            .collect();
1015
1016        drop(term_interner);
1017
1018        term_entries.par_sort_unstable_by(|a, b| a.0.cmp(&b.0));
1019
1020        // Phase 2: Parallel serialization
1021        let serialized: Vec<(Vec<u8>, SerializedPosting)> = term_entries
1022            .into_par_iter()
1023            .map(|(key, posting_builder)| {
1024                let mut full_postings = PostingList::with_capacity(posting_builder.len());
1025                for p in &posting_builder.postings {
1026                    full_postings.push(p.doc_id, p.term_freq as u32);
1027                }
1028
1029                let doc_ids: Vec<u32> = full_postings.iter().map(|p| p.doc_id).collect();
1030                let term_freqs: Vec<u32> = full_postings.iter().map(|p| p.term_freq).collect();
1031
1032                let has_positions = position_offsets.contains_key(&key);
1033                let result = if !has_positions
1034                    && let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs)
1035                {
1036                    SerializedPosting::Inline(inline)
1037                } else {
1038                    let mut posting_bytes = Vec::new();
1039                    let block_list =
1040                        crate::structures::BlockPostingList::from_posting_list(&full_postings)?;
1041                    block_list.serialize(&mut posting_bytes)?;
1042                    SerializedPosting::External {
1043                        bytes: posting_bytes,
1044                        doc_count: full_postings.doc_count(),
1045                    }
1046                };
1047
1048                Ok((key, result))
1049            })
1050            .collect::<Result<Vec<_>>>()?;
1051
1052        // Phase 3: Stream directly to writers (no intermediate Vec<u8> accumulation)
1053        let mut postings_offset = 0u64;
1054        let mut writer = SSTableWriter::<TermInfo>::new(term_dict_writer);
1055
1056        for (key, serialized_posting) in serialized {
1057            let term_info = match serialized_posting {
1058                SerializedPosting::Inline(info) => info,
1059                SerializedPosting::External { bytes, doc_count } => {
1060                    let posting_len = bytes.len() as u32;
1061                    postings_writer.write_all(&bytes)?;
1062
1063                    let info = if let Some(&(pos_offset, pos_len)) = position_offsets.get(&key) {
1064                        TermInfo::external_with_positions(
1065                            postings_offset,
1066                            posting_len,
1067                            doc_count,
1068                            pos_offset,
1069                            pos_len,
1070                        )
1071                    } else {
1072                        TermInfo::external(postings_offset, posting_len, doc_count)
1073                    };
1074                    postings_offset += posting_len as u64;
1075                    info
1076                }
1077            };
1078
1079            writer.insert(&key, &term_info)?;
1080        }
1081
1082        writer.finish()?;
1083        Ok(())
1084    }
1085
1086    /// Stream compressed document store directly to disk.
1087    ///
1088    /// Reads documents from temp file, compresses in parallel batches,
1089    /// and writes directly to the streaming writer (no Vec<u8> accumulation).
1090    fn build_store_streaming(
1091        store_path: &PathBuf,
1092        schema: &Schema,
1093        num_compression_threads: usize,
1094        compression_level: CompressionLevel,
1095        writer: &mut dyn Write,
1096    ) -> Result<()> {
1097        use super::store::EagerParallelStoreWriter;
1098
1099        let file = File::open(store_path)?;
1100        let mmap = unsafe { memmap2::Mmap::map(&file)? };
1101
1102        let mut doc_ranges: Vec<(usize, usize)> = Vec::new();
1103        let mut offset = 0usize;
1104        while offset + 4 <= mmap.len() {
1105            let doc_len = u32::from_le_bytes([
1106                mmap[offset],
1107                mmap[offset + 1],
1108                mmap[offset + 2],
1109                mmap[offset + 3],
1110            ]) as usize;
1111            offset += 4;
1112
1113            if offset + doc_len > mmap.len() {
1114                break;
1115            }
1116
1117            doc_ranges.push((offset, doc_len));
1118            offset += doc_len;
1119        }
1120
1121        const BATCH_SIZE: usize = 10_000;
1122        let mut store_writer = EagerParallelStoreWriter::with_compression_level(
1123            writer,
1124            num_compression_threads,
1125            compression_level,
1126        );
1127
1128        for batch in doc_ranges.chunks(BATCH_SIZE) {
1129            let batch_docs: Vec<Document> = batch
1130                .par_iter()
1131                .filter_map(|&(start, len)| {
1132                    let doc_bytes = &mmap[start..start + len];
1133                    super::store::deserialize_document(doc_bytes, schema).ok()
1134                })
1135                .collect();
1136
1137            for doc in &batch_docs {
1138                store_writer.store(doc, schema)?;
1139            }
1140        }
1141
1142        store_writer.finish()?;
1143        Ok(())
1144    }
1145}
1146
1147impl Drop for SegmentBuilder {
1148    fn drop(&mut self) {
1149        // Cleanup temp files on drop
1150        let _ = std::fs::remove_file(&self.store_path);
1151    }
1152}