Skip to main content

hermes_core/segment/builder/
mod.rs

1//! Streaming segment builder with optimized memory usage
2//!
3//! Key optimizations:
4//! - **String interning**: Terms are interned using `lasso` to avoid repeated allocations
5//! - **hashbrown HashMap**: O(1) average insertion instead of BTreeMap's O(log n)
6//! - **Streaming document store**: Documents written to disk immediately
7//! - **Incremental posting flush**: Large posting lists flushed to temp file
8//! - **Memory-mapped intermediate files**: Reduces memory pressure
9//! - **Arena allocation**: Batch allocations for reduced fragmentation
10
11mod config;
12mod posting;
13mod vectors;
14
15pub use config::{MemoryBreakdown, SegmentBuilderConfig, SegmentBuilderStats};
16
17use std::fs::{File, OpenOptions};
18use std::io::{BufWriter, Write};
19use std::mem::size_of;
20use std::path::PathBuf;
21
22use hashbrown::HashMap;
23use lasso::{Rodeo, Spur};
24use rayon::prelude::*;
25use rustc_hash::FxHashMap;
26
27use crate::compression::CompressionLevel;
28
29use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
30use crate::directories::{Directory, DirectoryWriter};
31use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
32use crate::structures::{PostingList, SSTableWriter, TermInfo};
33use crate::tokenizer::BoxedTokenizer;
34use crate::{DocId, Result};
35
36use posting::{
37    CompactPosting, PositionPostingListBuilder, PostingListBuilder, SerializedPosting, TermKey,
38};
39use vectors::{DenseVectorBuilder, SparseVectorBuilder};
40
41use super::vector_data::FlatVectorData;
42
43/// Size of the document store buffer before writing to disk
44const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; // 16MB
45
46/// Segment builder with optimized memory usage
47///
48/// Features:
49/// - Streams documents to disk immediately (no in-memory document storage)
50/// - Uses string interning for terms (reduced allocations)
51/// - Uses hashbrown HashMap (faster than BTreeMap)
52pub struct SegmentBuilder {
53    schema: Schema,
54    config: SegmentBuilderConfig,
55    tokenizers: FxHashMap<Field, BoxedTokenizer>,
56
57    /// String interner for terms - O(1) lookup and deduplication
58    term_interner: Rodeo,
59
60    /// Inverted index: term key -> posting list
61    inverted_index: HashMap<TermKey, PostingListBuilder>,
62
63    /// Streaming document store writer
64    store_file: BufWriter<File>,
65    store_path: PathBuf,
66
67    /// Document count
68    next_doc_id: DocId,
69
70    /// Per-field statistics for BM25F
71    field_stats: FxHashMap<u32, FieldStats>,
72
73    /// Per-document field lengths stored compactly
74    /// Uses a flat Vec instead of Vec<HashMap> for better cache locality
75    /// Layout: [doc0_field0_len, doc0_field1_len, ..., doc1_field0_len, ...]
76    doc_field_lengths: Vec<u32>,
77    num_indexed_fields: usize,
78    field_to_slot: FxHashMap<u32, usize>,
79
80    /// Reusable buffer for per-document term frequency aggregation
81    /// Avoids allocating a new hashmap for each document
82    local_tf_buffer: FxHashMap<Spur, u32>,
83
84    /// Reusable buffer for tokenization to avoid per-token String allocations
85    token_buffer: String,
86
87    /// Dense vector storage per field: field -> (doc_ids, vectors)
88    /// Vectors are stored as flat f32 arrays for efficient RaBitQ indexing
89    dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
90
91    /// Sparse vector storage per field: field -> SparseVectorBuilder
92    /// Uses proper BlockSparsePostingList with configurable quantization
93    sparse_vectors: FxHashMap<u32, SparseVectorBuilder>,
94
95    /// Position index for fields with positions enabled
96    /// term key -> position posting list
97    position_index: HashMap<TermKey, PositionPostingListBuilder>,
98
99    /// Fields that have position tracking enabled, with their mode
100    position_enabled_fields: FxHashMap<u32, Option<crate::dsl::PositionMode>>,
101
102    /// Current element ordinal for multi-valued fields (reset per document)
103    current_element_ordinal: FxHashMap<u32, u32>,
104
105    /// Incrementally tracked memory estimate (avoids expensive stats() calls)
106    estimated_memory: usize,
107}
108
109impl SegmentBuilder {
110    /// Create a new segment builder
111    pub fn new(schema: Schema, config: SegmentBuilderConfig) -> Result<Self> {
112        let segment_id = uuid::Uuid::new_v4();
113        let store_path = config
114            .temp_dir
115            .join(format!("hermes_store_{}.tmp", segment_id));
116
117        let store_file = BufWriter::with_capacity(
118            STORE_BUFFER_SIZE,
119            OpenOptions::new()
120                .create(true)
121                .write(true)
122                .truncate(true)
123                .open(&store_path)?,
124        );
125
126        // Count indexed fields for compact field length storage
127        // Also track which fields have position recording enabled
128        let mut num_indexed_fields = 0;
129        let mut field_to_slot = FxHashMap::default();
130        let mut position_enabled_fields = FxHashMap::default();
131        for (field, entry) in schema.fields() {
132            if entry.indexed && matches!(entry.field_type, FieldType::Text) {
133                field_to_slot.insert(field.0, num_indexed_fields);
134                num_indexed_fields += 1;
135                if entry.positions.is_some() {
136                    position_enabled_fields.insert(field.0, entry.positions);
137                }
138            }
139        }
140
141        Ok(Self {
142            schema,
143            tokenizers: FxHashMap::default(),
144            term_interner: Rodeo::new(),
145            inverted_index: HashMap::with_capacity(config.posting_map_capacity),
146            store_file,
147            store_path,
148            next_doc_id: 0,
149            field_stats: FxHashMap::default(),
150            doc_field_lengths: Vec::new(),
151            num_indexed_fields,
152            field_to_slot,
153            local_tf_buffer: FxHashMap::default(),
154            token_buffer: String::with_capacity(64),
155            config,
156            dense_vectors: FxHashMap::default(),
157            sparse_vectors: FxHashMap::default(),
158            position_index: HashMap::new(),
159            position_enabled_fields,
160            current_element_ordinal: FxHashMap::default(),
161            estimated_memory: 0,
162        })
163    }
164
165    pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
166        self.tokenizers.insert(field, tokenizer);
167    }
168
169    pub fn num_docs(&self) -> u32 {
170        self.next_doc_id
171    }
172
173    /// Fast O(1) memory estimate - updated incrementally during indexing
174    #[inline]
175    pub fn estimated_memory_bytes(&self) -> usize {
176        self.estimated_memory
177    }
178
179    /// Recalibrate incremental memory estimate using capacity-based calculation.
180    /// More expensive than estimated_memory_bytes() — O(terms + dims) vs O(1) —
181    /// but accounts for Vec capacity growth (doubling) and HashMap table overhead.
182    /// Call periodically (e.g. every 1000 docs) to prevent drift.
183    pub fn recalibrate_memory(&mut self) {
184        self.estimated_memory = self.stats().estimated_memory_bytes;
185    }
186
187    /// Count total unique sparse dimensions across all fields
188    pub fn sparse_dim_count(&self) -> usize {
189        self.sparse_vectors.values().map(|b| b.postings.len()).sum()
190    }
191
192    /// Get current statistics for debugging performance (expensive - iterates all data)
193    pub fn stats(&self) -> SegmentBuilderStats {
194        use std::mem::size_of;
195
196        let postings_in_memory: usize =
197            self.inverted_index.values().map(|p| p.postings.len()).sum();
198
199        // Size constants computed from actual types
200        let compact_posting_size = size_of::<CompactPosting>();
201        let vec_overhead = size_of::<Vec<u8>>(); // Vec header: ptr + len + cap = 24 bytes on 64-bit
202        let term_key_size = size_of::<TermKey>();
203        let posting_builder_size = size_of::<PostingListBuilder>();
204        let spur_size = size_of::<lasso::Spur>();
205        let sparse_entry_size = size_of::<(DocId, u16, f32)>();
206
207        // hashbrown HashMap entry overhead: key + value + 1 byte control + padding
208        // Measured: ~(key_size + value_size + 8) per entry on average
209        let hashmap_entry_base_overhead = 8usize;
210
211        // FxHashMap uses same layout as hashbrown
212        let fxhashmap_entry_overhead = hashmap_entry_base_overhead;
213
214        // Postings memory
215        let postings_bytes: usize = self
216            .inverted_index
217            .values()
218            .map(|p| p.postings.capacity() * compact_posting_size + vec_overhead)
219            .sum();
220
221        // Inverted index overhead
222        let index_overhead_bytes = self.inverted_index.len()
223            * (term_key_size + posting_builder_size + hashmap_entry_base_overhead);
224
225        // Term interner: Rodeo stores strings + metadata
226        // Rodeo internal: string bytes + Spur + arena overhead (~2 pointers per string)
227        let interner_arena_overhead = 2 * size_of::<usize>();
228        let avg_term_len = 8; // Estimated average term length
229        let interner_bytes =
230            self.term_interner.len() * (avg_term_len + spur_size + interner_arena_overhead);
231
232        // Doc field lengths
233        let field_lengths_bytes =
234            self.doc_field_lengths.capacity() * size_of::<u32>() + vec_overhead;
235
236        // Dense vectors
237        let mut dense_vectors_bytes: usize = 0;
238        let mut dense_vector_count: usize = 0;
239        let doc_id_ordinal_size = size_of::<(DocId, u16)>();
240        for b in self.dense_vectors.values() {
241            dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
242                + b.doc_ids.capacity() * doc_id_ordinal_size
243                + 2 * vec_overhead; // Two Vecs
244            dense_vector_count += b.doc_ids.len();
245        }
246
247        // Local buffers
248        let local_tf_entry_size = spur_size + size_of::<u32>() + fxhashmap_entry_overhead;
249        let local_tf_buffer_bytes = self.local_tf_buffer.capacity() * local_tf_entry_size;
250
251        // Sparse vectors
252        let mut sparse_vectors_bytes: usize = 0;
253        for builder in self.sparse_vectors.values() {
254            for postings in builder.postings.values() {
255                sparse_vectors_bytes += postings.capacity() * sparse_entry_size + vec_overhead;
256            }
257            // Inner FxHashMap overhead: u32 key + Vec value ptr + overhead
258            let inner_entry_size = size_of::<u32>() + vec_overhead + fxhashmap_entry_overhead;
259            sparse_vectors_bytes += builder.postings.len() * inner_entry_size;
260        }
261        // Outer FxHashMap overhead
262        let outer_sparse_entry_size =
263            size_of::<u32>() + size_of::<SparseVectorBuilder>() + fxhashmap_entry_overhead;
264        sparse_vectors_bytes += self.sparse_vectors.len() * outer_sparse_entry_size;
265
266        // Position index
267        let mut position_index_bytes: usize = 0;
268        for pos_builder in self.position_index.values() {
269            for (_, positions) in &pos_builder.postings {
270                position_index_bytes += positions.capacity() * size_of::<u32>() + vec_overhead;
271            }
272            // Vec<(DocId, Vec<u32>)> entry size
273            let pos_entry_size = size_of::<DocId>() + vec_overhead;
274            position_index_bytes += pos_builder.postings.capacity() * pos_entry_size;
275        }
276        // HashMap overhead for position_index
277        let pos_index_entry_size =
278            term_key_size + size_of::<PositionPostingListBuilder>() + hashmap_entry_base_overhead;
279        position_index_bytes += self.position_index.len() * pos_index_entry_size;
280
281        let estimated_memory_bytes = postings_bytes
282            + index_overhead_bytes
283            + interner_bytes
284            + field_lengths_bytes
285            + dense_vectors_bytes
286            + local_tf_buffer_bytes
287            + sparse_vectors_bytes
288            + position_index_bytes;
289
290        let memory_breakdown = MemoryBreakdown {
291            postings_bytes,
292            index_overhead_bytes,
293            interner_bytes,
294            field_lengths_bytes,
295            dense_vectors_bytes,
296            dense_vector_count,
297            sparse_vectors_bytes,
298            position_index_bytes,
299        };
300
301        SegmentBuilderStats {
302            num_docs: self.next_doc_id,
303            unique_terms: self.inverted_index.len(),
304            postings_in_memory,
305            interned_strings: self.term_interner.len(),
306            doc_field_lengths_size: self.doc_field_lengths.len(),
307            estimated_memory_bytes,
308            memory_breakdown,
309        }
310    }
311
312    /// Add a document - streams to disk immediately
313    pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
314        let doc_id = self.next_doc_id;
315        self.next_doc_id += 1;
316
317        // Initialize field lengths for this document
318        let base_idx = self.doc_field_lengths.len();
319        self.doc_field_lengths
320            .resize(base_idx + self.num_indexed_fields, 0);
321        self.estimated_memory += self.num_indexed_fields * std::mem::size_of::<u32>();
322
323        // Reset element ordinals for this document (for multi-valued fields)
324        self.current_element_ordinal.clear();
325
326        for (field, value) in doc.field_values() {
327            let entry = self.schema.get_field_entry(*field);
328            if entry.is_none() {
329                continue;
330            }
331
332            let entry = entry.unwrap();
333            // Dense vectors are written to .vectors when indexed || stored
334            // Other field types require indexed
335            let dominated_by_index = matches!(&entry.field_type, FieldType::DenseVector);
336            if !dominated_by_index && !entry.indexed {
337                continue;
338            }
339
340            match (&entry.field_type, value) {
341                (FieldType::Text, FieldValue::Text(text)) => {
342                    // Get current element ordinal for multi-valued fields
343                    let element_ordinal = *self.current_element_ordinal.get(&field.0).unwrap_or(&0);
344                    let token_count =
345                        self.index_text_field(*field, doc_id, text, element_ordinal)?;
346                    // Increment element ordinal for next value of this field
347                    *self.current_element_ordinal.entry(field.0).or_insert(0) += 1;
348
349                    // Update field statistics
350                    let stats = self.field_stats.entry(field.0).or_default();
351                    stats.total_tokens += token_count as u64;
352                    // Only count each document once, even for multi-value fields
353                    if element_ordinal == 0 {
354                        stats.doc_count += 1;
355                    }
356
357                    // Store field length compactly
358                    if let Some(&slot) = self.field_to_slot.get(&field.0) {
359                        self.doc_field_lengths[base_idx + slot] = token_count;
360                    }
361                }
362                (FieldType::U64, FieldValue::U64(v)) => {
363                    self.index_numeric_field(*field, doc_id, *v)?;
364                }
365                (FieldType::I64, FieldValue::I64(v)) => {
366                    self.index_numeric_field(*field, doc_id, *v as u64)?;
367                }
368                (FieldType::F64, FieldValue::F64(v)) => {
369                    self.index_numeric_field(*field, doc_id, v.to_bits())?;
370                }
371                (FieldType::DenseVector, FieldValue::DenseVector(vec))
372                    if entry.indexed || entry.stored =>
373                {
374                    // Dense vectors written to .vectors (not .store) when indexed || stored
375                    let element_ordinal = *self.current_element_ordinal.get(&field.0).unwrap_or(&0);
376                    self.index_dense_vector_field(*field, doc_id, element_ordinal as u16, vec)?;
377                    // Increment element ordinal for next value of this field
378                    *self.current_element_ordinal.entry(field.0).or_insert(0) += 1;
379                }
380                (FieldType::SparseVector, FieldValue::SparseVector(entries)) => {
381                    // Get current element ordinal for multi-valued fields
382                    let element_ordinal = *self.current_element_ordinal.get(&field.0).unwrap_or(&0);
383                    self.index_sparse_vector_field(
384                        *field,
385                        doc_id,
386                        element_ordinal as u16,
387                        entries,
388                    )?;
389                    // Increment element ordinal for next value of this field
390                    *self.current_element_ordinal.entry(field.0).or_insert(0) += 1;
391                }
392                _ => {}
393            }
394        }
395
396        // Stream document to disk immediately
397        self.write_document_to_store(&doc)?;
398
399        Ok(doc_id)
400    }
401
402    /// Index a text field using interned terms
403    ///
404    /// Optimization: Zero-allocation inline tokenization + term frequency aggregation.
405    /// Instead of allocating a String per token, we:
406    /// 1. Iterate over whitespace-split words
407    /// 2. Build lowercase token in a reusable buffer
408    /// 3. Intern directly from the buffer
409    ///
410    /// If position recording is enabled for this field, also records token positions
411    /// encoded as (element_ordinal << 20) | token_position.
412    fn index_text_field(
413        &mut self,
414        field: Field,
415        doc_id: DocId,
416        text: &str,
417        element_ordinal: u32,
418    ) -> Result<u32> {
419        use crate::dsl::PositionMode;
420
421        let field_id = field.0;
422        let position_mode = self
423            .position_enabled_fields
424            .get(&field_id)
425            .copied()
426            .flatten();
427
428        // Phase 1: Aggregate term frequencies within this document
429        // Also collect positions if enabled
430        // Reuse buffer to avoid allocations
431        self.local_tf_buffer.clear();
432
433        // For position tracking: term -> list of positions in this text
434        let mut local_positions: FxHashMap<Spur, Vec<u32>> = FxHashMap::default();
435
436        let mut token_position = 0u32;
437
438        // Zero-allocation tokenization: iterate words, lowercase inline, intern directly
439        for word in text.split_whitespace() {
440            // Build lowercase token in reusable buffer
441            self.token_buffer.clear();
442            for c in word.chars() {
443                if c.is_alphanumeric() {
444                    for lc in c.to_lowercase() {
445                        self.token_buffer.push(lc);
446                    }
447                }
448            }
449
450            if self.token_buffer.is_empty() {
451                continue;
452            }
453
454            // Intern the term directly from buffer - O(1) amortized
455            let is_new_string = !self.term_interner.contains(&self.token_buffer);
456            let term_spur = self.term_interner.get_or_intern(&self.token_buffer);
457            if is_new_string {
458                use std::mem::size_of;
459                // string bytes + Spur + arena overhead (2 pointers)
460                self.estimated_memory +=
461                    self.token_buffer.len() + size_of::<Spur>() + 2 * size_of::<usize>();
462            }
463            *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
464
465            // Record position based on mode
466            if let Some(mode) = position_mode {
467                let encoded_pos = match mode {
468                    // Ordinal only: just store element ordinal (token position = 0)
469                    PositionMode::Ordinal => element_ordinal << 20,
470                    // Token position only: just store token position (ordinal = 0)
471                    PositionMode::TokenPosition => token_position,
472                    // Full: encode both
473                    PositionMode::Full => (element_ordinal << 20) | token_position,
474                };
475                local_positions
476                    .entry(term_spur)
477                    .or_default()
478                    .push(encoded_pos);
479            }
480
481            token_position += 1;
482        }
483
484        // Phase 2: Insert aggregated terms into inverted index
485        // Now we only do one inverted_index lookup per unique term in doc
486        for (&term_spur, &tf) in &self.local_tf_buffer {
487            let term_key = TermKey {
488                field: field_id,
489                term: term_spur,
490            };
491
492            let is_new_term = !self.inverted_index.contains_key(&term_key);
493            let posting = self
494                .inverted_index
495                .entry(term_key)
496                .or_insert_with(PostingListBuilder::new);
497            posting.add(doc_id, tf);
498
499            // Incremental memory tracking
500            use std::mem::size_of;
501            self.estimated_memory += size_of::<CompactPosting>();
502            if is_new_term {
503                // HashMap entry overhead + PostingListBuilder + Vec header
504                self.estimated_memory +=
505                    size_of::<TermKey>() + size_of::<PostingListBuilder>() + 24;
506            }
507
508            // Add positions if enabled
509            if position_mode.is_some()
510                && let Some(positions) = local_positions.get(&term_spur)
511            {
512                let is_new_pos_term = !self.position_index.contains_key(&term_key);
513                let pos_posting = self
514                    .position_index
515                    .entry(term_key)
516                    .or_insert_with(PositionPostingListBuilder::new);
517                for &pos in positions {
518                    pos_posting.add_position(doc_id, pos);
519                }
520                // Incremental memory tracking for position index
521                self.estimated_memory += positions.len() * size_of::<u32>();
522                if is_new_pos_term {
523                    self.estimated_memory +=
524                        size_of::<TermKey>() + size_of::<PositionPostingListBuilder>() + 24;
525                }
526            }
527        }
528
529        Ok(token_position)
530    }
531
532    fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
533        use std::mem::size_of;
534
535        // For numeric fields, we use a special encoding
536        let term_str = format!("__num_{}", value);
537        let is_new_string = !self.term_interner.contains(&term_str);
538        let term_spur = self.term_interner.get_or_intern(&term_str);
539
540        let term_key = TermKey {
541            field: field.0,
542            term: term_spur,
543        };
544
545        let is_new_term = !self.inverted_index.contains_key(&term_key);
546        let posting = self
547            .inverted_index
548            .entry(term_key)
549            .or_insert_with(PostingListBuilder::new);
550        posting.add(doc_id, 1);
551
552        // Incremental memory tracking
553        self.estimated_memory += size_of::<CompactPosting>();
554        if is_new_term {
555            self.estimated_memory += size_of::<TermKey>() + size_of::<PostingListBuilder>() + 24;
556        }
557        if is_new_string {
558            self.estimated_memory += term_str.len() + size_of::<Spur>() + 2 * size_of::<usize>();
559        }
560
561        Ok(())
562    }
563
564    /// Index a dense vector field with ordinal tracking
565    fn index_dense_vector_field(
566        &mut self,
567        field: Field,
568        doc_id: DocId,
569        ordinal: u16,
570        vector: &[f32],
571    ) -> Result<()> {
572        let dim = vector.len();
573
574        let builder = self
575            .dense_vectors
576            .entry(field.0)
577            .or_insert_with(|| DenseVectorBuilder::new(dim));
578
579        // Verify dimension consistency
580        if builder.dim != dim && builder.len() > 0 {
581            return Err(crate::Error::Schema(format!(
582                "Dense vector dimension mismatch: expected {}, got {}",
583                builder.dim, dim
584            )));
585        }
586
587        builder.add(doc_id, ordinal, vector);
588
589        // Incremental memory tracking
590        use std::mem::{size_of, size_of_val};
591        self.estimated_memory += size_of_val(vector) + size_of::<(DocId, u16)>();
592
593        Ok(())
594    }
595
596    /// Index a sparse vector field using dedicated sparse posting lists
597    ///
598    /// Collects (doc_id, ordinal, weight) postings per dimension. During commit, these are
599    /// converted to BlockSparsePostingList with proper quantization from SparseVectorConfig.
600    ///
601    /// Weights below the configured `weight_threshold` are not indexed.
602    fn index_sparse_vector_field(
603        &mut self,
604        field: Field,
605        doc_id: DocId,
606        ordinal: u16,
607        entries: &[(u32, f32)],
608    ) -> Result<()> {
609        // Get weight threshold from field config (default 0.0 = no filtering)
610        let weight_threshold = self
611            .schema
612            .get_field_entry(field)
613            .and_then(|entry| entry.sparse_vector_config.as_ref())
614            .map(|config| config.weight_threshold)
615            .unwrap_or(0.0);
616
617        let builder = self
618            .sparse_vectors
619            .entry(field.0)
620            .or_insert_with(SparseVectorBuilder::new);
621
622        for &(dim_id, weight) in entries {
623            // Skip weights below threshold
624            if weight.abs() < weight_threshold {
625                continue;
626            }
627
628            // Incremental memory tracking
629            use std::mem::size_of;
630            let is_new_dim = !builder.postings.contains_key(&dim_id);
631            builder.add(dim_id, doc_id, ordinal, weight);
632            self.estimated_memory += size_of::<(DocId, u16, f32)>();
633            if is_new_dim {
634                // HashMap entry overhead + Vec header
635                self.estimated_memory += size_of::<u32>() + size_of::<Vec<(DocId, u16, f32)>>() + 8; // 8 = hashmap control byte + padding
636            }
637        }
638
639        Ok(())
640    }
641
642    /// Write document to streaming store
643    fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
644        use byteorder::{LittleEndian, WriteBytesExt};
645
646        let doc_bytes = super::store::serialize_document(doc, &self.schema)?;
647
648        self.store_file
649            .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
650        self.store_file.write_all(&doc_bytes)?;
651
652        Ok(())
653    }
654
655    /// Build the final segment
656    ///
657    /// Streams all data directly to disk via StreamingWriter to avoid buffering
658    /// entire serialized outputs in memory. Each phase consumes and drops its
659    /// source data before the next phase begins.
660    pub async fn build<D: Directory + DirectoryWriter>(
661        mut self,
662        dir: &D,
663        segment_id: SegmentId,
664        trained: Option<&super::TrainedVectorStructures>,
665    ) -> Result<SegmentMeta> {
666        // Flush any buffered data
667        self.store_file.flush()?;
668
669        let files = SegmentFiles::new(segment_id.0);
670
671        // Phase 1: Stream positions directly to disk (consumes position_index)
672        let position_index = std::mem::take(&mut self.position_index);
673        let position_offsets = if !position_index.is_empty() {
674            let mut pos_writer = dir.streaming_writer(&files.positions).await?;
675            let offsets = Self::build_positions_streaming(
676                position_index,
677                &self.term_interner,
678                &mut *pos_writer,
679            )?;
680            pos_writer.finish()?;
681            offsets
682        } else {
683            FxHashMap::default()
684        };
685
686        // Phase 2: Build postings + store — stream directly to disk
687        let inverted_index = std::mem::take(&mut self.inverted_index);
688        let term_interner = std::mem::replace(&mut self.term_interner, Rodeo::new());
689        let store_path = self.store_path.clone();
690        let schema_clone = self.schema.clone();
691        let num_compression_threads = self.config.num_compression_threads;
692        let compression_level = self.config.compression_level;
693
694        // Create streaming writers (async) before entering sync build phases
695        let mut term_dict_writer = dir.streaming_writer(&files.term_dict).await?;
696        let mut postings_writer = dir.streaming_writer(&files.postings).await?;
697        let mut store_writer = dir.streaming_writer(&files.store).await?;
698
699        let (postings_result, store_result) = rayon::join(
700            || {
701                Self::build_postings_streaming(
702                    inverted_index,
703                    term_interner,
704                    &position_offsets,
705                    &mut *term_dict_writer,
706                    &mut *postings_writer,
707                )
708            },
709            || {
710                Self::build_store_streaming(
711                    &store_path,
712                    &schema_clone,
713                    num_compression_threads,
714                    compression_level,
715                    &mut *store_writer,
716                )
717            },
718        );
719        postings_result?;
720        store_result?;
721        term_dict_writer.finish()?;
722        postings_writer.finish()?;
723        store_writer.finish()?;
724        drop(position_offsets);
725
726        // Phase 3: Dense vectors — stream directly to disk
727        let dense_vectors = std::mem::take(&mut self.dense_vectors);
728        if !dense_vectors.is_empty() {
729            let mut writer = dir.streaming_writer(&files.vectors).await?;
730            Self::build_vectors_streaming(dense_vectors, &self.schema, trained, &mut *writer)?;
731            writer.finish()?;
732        }
733
734        // Phase 4: Sparse vectors — stream directly to disk
735        let mut sparse_vectors = std::mem::take(&mut self.sparse_vectors);
736        if !sparse_vectors.is_empty() {
737            let mut writer = dir.streaming_writer(&files.sparse).await?;
738            Self::build_sparse_streaming(&mut sparse_vectors, &self.schema, &mut *writer)?;
739            drop(sparse_vectors);
740            writer.finish()?;
741        }
742
743        let meta = SegmentMeta {
744            id: segment_id.0,
745            num_docs: self.next_doc_id,
746            field_stats: self.field_stats.clone(),
747        };
748
749        dir.write(&files.meta, &meta.serialize()?).await?;
750
751        // Cleanup temp files
752        let _ = std::fs::remove_file(&self.store_path);
753
754        Ok(meta)
755    }
756
757    /// Stream dense vectors directly to disk (zero-buffer for vector data).
758    ///
759    /// Computes sizes deterministically (no trial serialization needed), writes
760    /// a small header, then streams each field's raw f32 data directly to the writer.
761    fn build_vectors_streaming(
762        dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
763        schema: &Schema,
764        trained: Option<&super::TrainedVectorStructures>,
765        writer: &mut dyn Write,
766    ) -> Result<()> {
767        use crate::dsl::{DenseVectorQuantization, VectorIndexType};
768        use byteorder::{LittleEndian, WriteBytesExt};
769
770        let mut fields: Vec<(u32, DenseVectorBuilder)> = dense_vectors
771            .into_iter()
772            .filter(|(_, b)| b.len() > 0)
773            .collect();
774        fields.sort_by_key(|(id, _)| *id);
775
776        if fields.is_empty() {
777            return Ok(());
778        }
779
780        // Resolve quantization config per field from schema
781        let quants: Vec<DenseVectorQuantization> = fields
782            .iter()
783            .map(|(field_id, _)| {
784                schema
785                    .get_field_entry(Field(*field_id))
786                    .and_then(|e| e.dense_vector_config.as_ref())
787                    .map(|c| c.quantization)
788                    .unwrap_or(DenseVectorQuantization::F32)
789            })
790            .collect();
791
792        // Compute sizes using deterministic formula (no serialization needed)
793        let mut field_sizes: Vec<usize> = Vec::with_capacity(fields.len());
794        for (i, (_field_id, builder)) in fields.iter().enumerate() {
795            field_sizes.push(FlatVectorData::serialized_binary_size(
796                builder.dim,
797                builder.len(),
798                quants[i],
799            ));
800        }
801
802        /// Magic number for vectors file footer ("VEC2" in LE)
803        const VECTORS_FOOTER_MAGIC: u32 = 0x32434556;
804
805        // Data-first format: stream field data, then write TOC + footer at end.
806        // Data starts at file offset 0 → mmap page-aligned, no alignment copies.
807        struct TocEntry {
808            field_id: u32,
809            index_type: u8,
810            offset: u64,
811            size: u64,
812        }
813        let mut toc: Vec<TocEntry> = Vec::with_capacity(fields.len() * 2);
814        let mut current_offset = 0u64;
815
816        // Pre-build ANN indexes while we still have access to the raw vectors.
817        // Each ANN blob is (field_id, index_type, serialized_bytes).
818        let mut ann_blobs: Vec<(u32, u8, Vec<u8>)> = Vec::new();
819        if let Some(trained) = trained {
820            for (field_id, builder) in &fields {
821                let config = schema
822                    .get_field_entry(Field(*field_id))
823                    .and_then(|e| e.dense_vector_config.as_ref());
824
825                if let Some(config) = config {
826                    let dim = builder.dim;
827                    let blob = match config.index_type {
828                        VectorIndexType::IvfRaBitQ if trained.centroids.contains_key(field_id) => {
829                            let centroids = &trained.centroids[field_id];
830                            let (mut index, codebook) =
831                                super::ann_build::new_ivf_rabitq(dim, centroids);
832                            for (i, (doc_id, ordinal)) in builder.doc_ids.iter().enumerate() {
833                                let v = &builder.vectors[i * dim..(i + 1) * dim];
834                                index.add_vector(centroids, &codebook, *doc_id, *ordinal, v);
835                            }
836                            super::ann_build::serialize_ivf_rabitq(index, codebook)
837                                .map(|b| (super::ann_build::IVF_RABITQ_TYPE, b))
838                        }
839                        VectorIndexType::ScaNN
840                            if trained.centroids.contains_key(field_id)
841                                && trained.codebooks.contains_key(field_id) =>
842                        {
843                            let centroids = &trained.centroids[field_id];
844                            let codebook = &trained.codebooks[field_id];
845                            let mut index = super::ann_build::new_scann(dim, centroids, codebook);
846                            for (i, (doc_id, ordinal)) in builder.doc_ids.iter().enumerate() {
847                                let v = &builder.vectors[i * dim..(i + 1) * dim];
848                                index.add_vector(centroids, codebook, *doc_id, *ordinal, v);
849                            }
850                            super::ann_build::serialize_scann(index, codebook)
851                                .map(|b| (super::ann_build::SCANN_TYPE, b))
852                        }
853                        _ => continue,
854                    };
855                    match blob {
856                        Ok((index_type, bytes)) => {
857                            log::info!(
858                                "[segment_build] built ANN(type={}) for field {} ({} vectors, {} bytes)",
859                                index_type,
860                                field_id,
861                                builder.doc_ids.len(),
862                                bytes.len()
863                            );
864                            ann_blobs.push((*field_id, index_type, bytes));
865                        }
866                        Err(e) => {
867                            log::warn!(
868                                "[segment_build] ANN serialize failed for field {}: {}",
869                                field_id,
870                                e
871                            );
872                        }
873                    }
874                }
875            }
876        }
877
878        // Stream each field's flat data directly (builder → disk, no intermediate buffer)
879        for (i, (_field_id, builder)) in fields.into_iter().enumerate() {
880            let data_offset = current_offset;
881            FlatVectorData::serialize_binary_from_flat_streaming(
882                builder.dim,
883                &builder.vectors,
884                &builder.doc_ids,
885                quants[i],
886                writer,
887            )
888            .map_err(crate::Error::Io)?;
889            current_offset += field_sizes[i] as u64;
890            toc.push(TocEntry {
891                field_id: _field_id,
892                index_type: super::ann_build::FLAT_TYPE,
893                offset: data_offset,
894                size: field_sizes[i] as u64,
895            });
896            // Pad to 8-byte boundary so next field's mmap slice is aligned
897            let pad = (8 - (current_offset % 8)) % 8;
898            if pad > 0 {
899                writer.write_all(&[0u8; 8][..pad as usize])?;
900                current_offset += pad;
901            }
902            // builder dropped here, freeing vector memory before next field
903        }
904
905        // Write ANN blob entries after flat entries
906        for (field_id, index_type, blob) in ann_blobs {
907            let data_offset = current_offset;
908            let blob_len = blob.len() as u64;
909            writer.write_all(&blob)?;
910            current_offset += blob_len;
911            toc.push(TocEntry {
912                field_id,
913                index_type,
914                offset: data_offset,
915                size: blob_len,
916            });
917            let pad = (8 - (current_offset % 8)) % 8;
918            if pad > 0 {
919                writer.write_all(&[0u8; 8][..pad as usize])?;
920                current_offset += pad;
921            }
922        }
923
924        // Write TOC entries
925        let toc_offset = current_offset;
926        for entry in &toc {
927            writer.write_u32::<LittleEndian>(entry.field_id)?;
928            writer.write_u8(entry.index_type)?;
929            writer.write_u64::<LittleEndian>(entry.offset)?;
930            writer.write_u64::<LittleEndian>(entry.size)?;
931        }
932
933        // Write footer: toc_offset(8) + num_fields(4) + magic(4)
934        writer.write_u64::<LittleEndian>(toc_offset)?;
935        writer.write_u32::<LittleEndian>(toc.len() as u32)?;
936        writer.write_u32::<LittleEndian>(VECTORS_FOOTER_MAGIC)?;
937
938        Ok(())
939    }
940
941    /// Stream sparse vectors directly to disk.
942    ///
943    /// Serializes per-dimension posting lists (needed for header offset computation),
944    /// writes header, then streams each dimension's data directly to the writer.
945    /// Eliminates the triple-buffered approach (dim_bytes + all_data + output).
946    fn build_sparse_streaming(
947        sparse_vectors: &mut FxHashMap<u32, SparseVectorBuilder>,
948        schema: &Schema,
949        writer: &mut dyn Write,
950    ) -> Result<()> {
951        use crate::structures::{BlockSparsePostingList, WeightQuantization};
952        use byteorder::{LittleEndian, WriteBytesExt};
953
954        if sparse_vectors.is_empty() {
955            return Ok(());
956        }
957
958        // Phase 1: Serialize all dims, keep bytes for offset computation
959        type DimEntry = (u32, Vec<u8>);
960        type FieldEntry = (u32, WeightQuantization, Vec<DimEntry>);
961        let mut field_data: Vec<FieldEntry> = Vec::new();
962
963        for (&field_id, builder) in sparse_vectors.iter_mut() {
964            if builder.is_empty() {
965                continue;
966            }
967
968            let field = crate::dsl::Field(field_id);
969            let sparse_config = schema
970                .get_field_entry(field)
971                .and_then(|e| e.sparse_vector_config.as_ref());
972
973            let quantization = sparse_config
974                .map(|c| c.weight_quantization)
975                .unwrap_or(WeightQuantization::Float32);
976
977            let block_size = sparse_config.map(|c| c.block_size).unwrap_or(128);
978            let pruning_fraction = sparse_config.and_then(|c| c.posting_list_pruning);
979
980            let mut dims: Vec<DimEntry> = Vec::new();
981
982            for (&dim_id, postings) in builder.postings.iter_mut() {
983                postings.sort_unstable_by_key(|(doc_id, ordinal, _)| (*doc_id, *ordinal));
984
985                if let Some(fraction) = pruning_fraction
986                    && postings.len() > 1
987                    && fraction < 1.0
988                {
989                    let original_len = postings.len();
990                    postings.sort_by(|a, b| {
991                        b.2.abs()
992                            .partial_cmp(&a.2.abs())
993                            .unwrap_or(std::cmp::Ordering::Equal)
994                    });
995                    let keep = ((original_len as f64 * fraction as f64).ceil() as usize).max(1);
996                    postings.truncate(keep);
997                    postings.sort_unstable_by_key(|(d, o, _)| (*d, *o));
998                }
999
1000                let block_list = BlockSparsePostingList::from_postings_with_block_size(
1001                    postings,
1002                    quantization,
1003                    block_size,
1004                )
1005                .map_err(crate::Error::Io)?;
1006
1007                let mut bytes = Vec::new();
1008                block_list.serialize(&mut bytes).map_err(crate::Error::Io)?;
1009                dims.push((dim_id, bytes));
1010            }
1011
1012            dims.sort_by_key(|(id, _)| *id);
1013            field_data.push((field_id, quantization, dims));
1014        }
1015
1016        if field_data.is_empty() {
1017            return Ok(());
1018        }
1019
1020        field_data.sort_by_key(|(id, _, _)| *id);
1021
1022        // Phase 2: Compute header size and offsets
1023        // num_fields(u32) + per-field: field_id(u32) + quantization(u8) + num_dims(u32)
1024        // per-dim: dim_id(u32) + offset(u64) + length(u32)
1025        let per_dim_entry = size_of::<u32>() + size_of::<u64>() + size_of::<u32>();
1026        let per_field_header = size_of::<u32>() + size_of::<u8>() + size_of::<u32>();
1027        let mut header_size = size_of::<u32>() as u64;
1028        for (_, _, dims) in &field_data {
1029            header_size += per_field_header as u64;
1030            header_size += (dims.len() as u64) * per_dim_entry as u64;
1031        }
1032
1033        let mut header = Vec::with_capacity(header_size as usize);
1034        header.write_u32::<LittleEndian>(field_data.len() as u32)?;
1035
1036        let mut current_offset = header_size;
1037        for (field_id, quantization, dims) in &field_data {
1038            header.write_u32::<LittleEndian>(*field_id)?;
1039            header.write_u8(*quantization as u8)?;
1040            header.write_u32::<LittleEndian>(dims.len() as u32)?;
1041
1042            for (dim_id, bytes) in dims {
1043                header.write_u32::<LittleEndian>(*dim_id)?;
1044                header.write_u64::<LittleEndian>(current_offset)?;
1045                header.write_u32::<LittleEndian>(bytes.len() as u32)?;
1046                current_offset += bytes.len() as u64;
1047            }
1048        }
1049
1050        // Phase 3: Write header, then stream each dimension's data
1051        writer.write_all(&header)?;
1052
1053        for (_, _, dims) in field_data {
1054            for (_, bytes) in dims {
1055                writer.write_all(&bytes)?;
1056                // bytes dropped here — one dim at a time
1057            }
1058        }
1059
1060        Ok(())
1061    }
1062
1063    /// Stream positions directly to disk, returning only the offset map.
1064    ///
1065    /// Consumes the position_index and writes each position posting list
1066    /// directly to the writer, tracking offsets for the postings phase.
1067    fn build_positions_streaming(
1068        position_index: HashMap<TermKey, PositionPostingListBuilder>,
1069        term_interner: &Rodeo,
1070        writer: &mut dyn Write,
1071    ) -> Result<FxHashMap<Vec<u8>, (u64, u32)>> {
1072        use crate::structures::PositionPostingList;
1073
1074        let mut position_offsets: FxHashMap<Vec<u8>, (u64, u32)> = FxHashMap::default();
1075
1076        // Consume HashMap into Vec for sorting (owned, no borrowing)
1077        let mut entries: Vec<(Vec<u8>, PositionPostingListBuilder)> = position_index
1078            .into_iter()
1079            .map(|(term_key, pos_builder)| {
1080                let term_str = term_interner.resolve(&term_key.term);
1081                let mut key = Vec::with_capacity(size_of::<u32>() + term_str.len());
1082                key.extend_from_slice(&term_key.field.to_le_bytes());
1083                key.extend_from_slice(term_str.as_bytes());
1084                (key, pos_builder)
1085            })
1086            .collect();
1087
1088        entries.sort_by(|a, b| a.0.cmp(&b.0));
1089
1090        let mut current_offset = 0u64;
1091
1092        for (key, pos_builder) in entries {
1093            let mut pos_list = PositionPostingList::with_capacity(pos_builder.postings.len());
1094            for (doc_id, positions) in pos_builder.postings {
1095                pos_list.push(doc_id, positions);
1096            }
1097
1098            // Serialize to a temp buffer to get exact length, then write
1099            let mut buf = Vec::new();
1100            pos_list.serialize(&mut buf).map_err(crate::Error::Io)?;
1101            writer.write_all(&buf)?;
1102
1103            position_offsets.insert(key, (current_offset, buf.len() as u32));
1104            current_offset += buf.len() as u64;
1105        }
1106
1107        Ok(position_offsets)
1108    }
1109
1110    /// Stream postings directly to disk.
1111    ///
1112    /// Parallel serialization of posting lists, then sequential streaming of
1113    /// term dict and postings data directly to writers (no Vec<u8> accumulation).
1114    fn build_postings_streaming(
1115        inverted_index: HashMap<TermKey, PostingListBuilder>,
1116        term_interner: Rodeo,
1117        position_offsets: &FxHashMap<Vec<u8>, (u64, u32)>,
1118        term_dict_writer: &mut dyn Write,
1119        postings_writer: &mut dyn Write,
1120    ) -> Result<()> {
1121        // Phase 1: Consume HashMap into sorted Vec (frees HashMap overhead)
1122        let mut term_entries: Vec<(Vec<u8>, PostingListBuilder)> = inverted_index
1123            .into_iter()
1124            .map(|(term_key, posting_list)| {
1125                let term_str = term_interner.resolve(&term_key.term);
1126                let mut key = Vec::with_capacity(4 + term_str.len());
1127                key.extend_from_slice(&term_key.field.to_le_bytes());
1128                key.extend_from_slice(term_str.as_bytes());
1129                (key, posting_list)
1130            })
1131            .collect();
1132
1133        drop(term_interner);
1134
1135        term_entries.par_sort_unstable_by(|a, b| a.0.cmp(&b.0));
1136
1137        // Phase 2: Parallel serialization
1138        let serialized: Vec<(Vec<u8>, SerializedPosting)> = term_entries
1139            .into_par_iter()
1140            .map(|(key, posting_builder)| {
1141                let mut full_postings = PostingList::with_capacity(posting_builder.len());
1142                for p in &posting_builder.postings {
1143                    full_postings.push(p.doc_id, p.term_freq as u32);
1144                }
1145
1146                let doc_ids: Vec<u32> = full_postings.iter().map(|p| p.doc_id).collect();
1147                let term_freqs: Vec<u32> = full_postings.iter().map(|p| p.term_freq).collect();
1148
1149                let has_positions = position_offsets.contains_key(&key);
1150                let result = if !has_positions
1151                    && let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs)
1152                {
1153                    SerializedPosting::Inline(inline)
1154                } else {
1155                    let mut posting_bytes = Vec::new();
1156                    let block_list =
1157                        crate::structures::BlockPostingList::from_posting_list(&full_postings)?;
1158                    block_list.serialize(&mut posting_bytes)?;
1159                    SerializedPosting::External {
1160                        bytes: posting_bytes,
1161                        doc_count: full_postings.doc_count(),
1162                    }
1163                };
1164
1165                Ok((key, result))
1166            })
1167            .collect::<Result<Vec<_>>>()?;
1168
1169        // Phase 3: Stream directly to writers (no intermediate Vec<u8> accumulation)
1170        let mut postings_offset = 0u64;
1171        let mut writer = SSTableWriter::<TermInfo>::new(term_dict_writer);
1172
1173        for (key, serialized_posting) in serialized {
1174            let term_info = match serialized_posting {
1175                SerializedPosting::Inline(info) => info,
1176                SerializedPosting::External { bytes, doc_count } => {
1177                    let posting_len = bytes.len() as u32;
1178                    postings_writer.write_all(&bytes)?;
1179
1180                    let info = if let Some(&(pos_offset, pos_len)) = position_offsets.get(&key) {
1181                        TermInfo::external_with_positions(
1182                            postings_offset,
1183                            posting_len,
1184                            doc_count,
1185                            pos_offset,
1186                            pos_len,
1187                        )
1188                    } else {
1189                        TermInfo::external(postings_offset, posting_len, doc_count)
1190                    };
1191                    postings_offset += posting_len as u64;
1192                    info
1193                }
1194            };
1195
1196            writer.insert(&key, &term_info)?;
1197        }
1198
1199        writer.finish()?;
1200        Ok(())
1201    }
1202
1203    /// Stream compressed document store directly to disk.
1204    ///
1205    /// Reads documents from temp file, compresses in parallel batches,
1206    /// and writes directly to the streaming writer (no Vec<u8> accumulation).
1207    fn build_store_streaming(
1208        store_path: &PathBuf,
1209        schema: &Schema,
1210        num_compression_threads: usize,
1211        compression_level: CompressionLevel,
1212        writer: &mut dyn Write,
1213    ) -> Result<()> {
1214        use super::store::EagerParallelStoreWriter;
1215
1216        let file = File::open(store_path)?;
1217        let mmap = unsafe { memmap2::Mmap::map(&file)? };
1218
1219        let mut doc_ranges: Vec<(usize, usize)> = Vec::new();
1220        let mut offset = 0usize;
1221        while offset + 4 <= mmap.len() {
1222            let doc_len = u32::from_le_bytes([
1223                mmap[offset],
1224                mmap[offset + 1],
1225                mmap[offset + 2],
1226                mmap[offset + 3],
1227            ]) as usize;
1228            offset += 4;
1229
1230            if offset + doc_len > mmap.len() {
1231                break;
1232            }
1233
1234            doc_ranges.push((offset, doc_len));
1235            offset += doc_len;
1236        }
1237
1238        const BATCH_SIZE: usize = 10_000;
1239        let mut store_writer = EagerParallelStoreWriter::with_compression_level(
1240            writer,
1241            num_compression_threads,
1242            compression_level,
1243        );
1244
1245        for batch in doc_ranges.chunks(BATCH_SIZE) {
1246            let batch_docs: Vec<Document> = batch
1247                .par_iter()
1248                .filter_map(|&(start, len)| {
1249                    let doc_bytes = &mmap[start..start + len];
1250                    super::store::deserialize_document(doc_bytes, schema).ok()
1251                })
1252                .collect();
1253
1254            for doc in &batch_docs {
1255                store_writer.store(doc, schema)?;
1256            }
1257        }
1258
1259        store_writer.finish()?;
1260        Ok(())
1261    }
1262}
1263
1264impl Drop for SegmentBuilder {
1265    fn drop(&mut self) {
1266        // Cleanup temp files on drop
1267        let _ = std::fs::remove_file(&self.store_path);
1268    }
1269}