Skip to main content

hermes_core/segment/builder/
mod.rs

1//! Streaming segment builder with optimized memory usage
2//!
3//! Key optimizations:
4//! - **String interning**: Terms are interned using `lasso` to avoid repeated allocations
5//! - **hashbrown HashMap**: O(1) average insertion instead of BTreeMap's O(log n)
6//! - **Streaming document store**: Documents written to disk immediately
7//! - **Incremental posting flush**: Large posting lists flushed to temp file
8//! - **Memory-mapped intermediate files**: Reduces memory pressure
9//! - **Arena allocation**: Batch allocations for reduced fragmentation
10
11mod config;
12mod posting;
13mod vectors;
14
15pub use config::{MemoryBreakdown, SegmentBuilderConfig, SegmentBuilderStats};
16
17use std::fs::{File, OpenOptions};
18use std::io::{BufWriter, Write};
19use std::path::PathBuf;
20
21use hashbrown::HashMap;
22use lasso::{Rodeo, Spur};
23use rayon::prelude::*;
24use rustc_hash::FxHashMap;
25
26use crate::compression::CompressionLevel;
27
28use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
29use crate::directories::{Directory, DirectoryWriter};
30use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
31use crate::structures::{PostingList, SSTableWriter, TermInfo};
32use crate::tokenizer::BoxedTokenizer;
33use crate::{DocId, Result};
34
35use posting::{
36    CompactPosting, PositionPostingListBuilder, PostingListBuilder, SerializedPosting, TermKey,
37};
38use vectors::{DenseVectorBuilder, SparseVectorBuilder};
39
40// Re-export from vector_data for backwards compatibility
41pub use super::vector_data::{FlatVectorData, IVFRaBitQIndexData, ScaNNIndexData};
42
43/// Size of the document store buffer before writing to disk
44const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; // 16MB
45
46/// Segment builder with optimized memory usage
47///
48/// Features:
49/// - Streams documents to disk immediately (no in-memory document storage)
50/// - Uses string interning for terms (reduced allocations)
51/// - Uses hashbrown HashMap (faster than BTreeMap)
52pub struct SegmentBuilder {
53    schema: Schema,
54    config: SegmentBuilderConfig,
55    tokenizers: FxHashMap<Field, BoxedTokenizer>,
56
57    /// String interner for terms - O(1) lookup and deduplication
58    term_interner: Rodeo,
59
60    /// Inverted index: term key -> posting list
61    inverted_index: HashMap<TermKey, PostingListBuilder>,
62
63    /// Streaming document store writer
64    store_file: BufWriter<File>,
65    store_path: PathBuf,
66
67    /// Document count
68    next_doc_id: DocId,
69
70    /// Per-field statistics for BM25F
71    field_stats: FxHashMap<u32, FieldStats>,
72
73    /// Per-document field lengths stored compactly
74    /// Uses a flat Vec instead of Vec<HashMap> for better cache locality
75    /// Layout: [doc0_field0_len, doc0_field1_len, ..., doc1_field0_len, ...]
76    doc_field_lengths: Vec<u32>,
77    num_indexed_fields: usize,
78    field_to_slot: FxHashMap<u32, usize>,
79
80    /// Reusable buffer for per-document term frequency aggregation
81    /// Avoids allocating a new hashmap for each document
82    local_tf_buffer: FxHashMap<Spur, u32>,
83
84    /// Reusable buffer for tokenization to avoid per-token String allocations
85    token_buffer: String,
86
87    /// Dense vector storage per field: field -> (doc_ids, vectors)
88    /// Vectors are stored as flat f32 arrays for efficient RaBitQ indexing
89    dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
90
91    /// Sparse vector storage per field: field -> SparseVectorBuilder
92    /// Uses proper BlockSparsePostingList with configurable quantization
93    sparse_vectors: FxHashMap<u32, SparseVectorBuilder>,
94
95    /// Position index for fields with positions enabled
96    /// term key -> position posting list
97    position_index: HashMap<TermKey, PositionPostingListBuilder>,
98
99    /// Fields that have position tracking enabled, with their mode
100    position_enabled_fields: FxHashMap<u32, Option<crate::dsl::PositionMode>>,
101
102    /// Current element ordinal for multi-valued fields (reset per document)
103    current_element_ordinal: FxHashMap<u32, u32>,
104
105    /// Incrementally tracked memory estimate (avoids expensive stats() calls)
106    estimated_memory: usize,
107}
108
109impl SegmentBuilder {
110    /// Create a new segment builder
111    pub fn new(schema: Schema, config: SegmentBuilderConfig) -> Result<Self> {
112        let segment_id = uuid::Uuid::new_v4();
113        let store_path = config
114            .temp_dir
115            .join(format!("hermes_store_{}.tmp", segment_id));
116
117        let store_file = BufWriter::with_capacity(
118            STORE_BUFFER_SIZE,
119            OpenOptions::new()
120                .create(true)
121                .write(true)
122                .truncate(true)
123                .open(&store_path)?,
124        );
125
126        // Count indexed fields for compact field length storage
127        // Also track which fields have position recording enabled
128        let mut num_indexed_fields = 0;
129        let mut field_to_slot = FxHashMap::default();
130        let mut position_enabled_fields = FxHashMap::default();
131        for (field, entry) in schema.fields() {
132            if entry.indexed && matches!(entry.field_type, FieldType::Text) {
133                field_to_slot.insert(field.0, num_indexed_fields);
134                num_indexed_fields += 1;
135                if entry.positions.is_some() {
136                    position_enabled_fields.insert(field.0, entry.positions);
137                }
138            }
139        }
140
141        Ok(Self {
142            schema,
143            tokenizers: FxHashMap::default(),
144            term_interner: Rodeo::new(),
145            inverted_index: HashMap::with_capacity(config.posting_map_capacity),
146            store_file,
147            store_path,
148            next_doc_id: 0,
149            field_stats: FxHashMap::default(),
150            doc_field_lengths: Vec::new(),
151            num_indexed_fields,
152            field_to_slot,
153            local_tf_buffer: FxHashMap::default(),
154            token_buffer: String::with_capacity(64),
155            config,
156            dense_vectors: FxHashMap::default(),
157            sparse_vectors: FxHashMap::default(),
158            position_index: HashMap::new(),
159            position_enabled_fields,
160            current_element_ordinal: FxHashMap::default(),
161            estimated_memory: 0,
162        })
163    }
164
165    pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
166        self.tokenizers.insert(field, tokenizer);
167    }
168
169    pub fn num_docs(&self) -> u32 {
170        self.next_doc_id
171    }
172
173    /// Fast O(1) memory estimate - updated incrementally during indexing
174    #[inline]
175    pub fn estimated_memory_bytes(&self) -> usize {
176        self.estimated_memory
177    }
178
179    /// Recalibrate incremental memory estimate using capacity-based calculation.
180    /// More expensive than estimated_memory_bytes() — O(terms + dims) vs O(1) —
181    /// but accounts for Vec capacity growth (doubling) and HashMap table overhead.
182    /// Call periodically (e.g. every 1000 docs) to prevent drift.
183    pub fn recalibrate_memory(&mut self) {
184        self.estimated_memory = self.stats().estimated_memory_bytes;
185    }
186
187    /// Count total unique sparse dimensions across all fields
188    pub fn sparse_dim_count(&self) -> usize {
189        self.sparse_vectors.values().map(|b| b.postings.len()).sum()
190    }
191
192    /// Get current statistics for debugging performance (expensive - iterates all data)
193    pub fn stats(&self) -> SegmentBuilderStats {
194        use std::mem::size_of;
195
196        let postings_in_memory: usize =
197            self.inverted_index.values().map(|p| p.postings.len()).sum();
198
199        // Size constants computed from actual types
200        let compact_posting_size = size_of::<CompactPosting>();
201        let vec_overhead = size_of::<Vec<u8>>(); // Vec header: ptr + len + cap = 24 bytes on 64-bit
202        let term_key_size = size_of::<TermKey>();
203        let posting_builder_size = size_of::<PostingListBuilder>();
204        let spur_size = size_of::<lasso::Spur>();
205        let sparse_entry_size = size_of::<(DocId, u16, f32)>();
206
207        // hashbrown HashMap entry overhead: key + value + 1 byte control + padding
208        // Measured: ~(key_size + value_size + 8) per entry on average
209        let hashmap_entry_base_overhead = 8usize;
210
211        // FxHashMap uses same layout as hashbrown
212        let fxhashmap_entry_overhead = hashmap_entry_base_overhead;
213
214        // Postings memory
215        let postings_bytes: usize = self
216            .inverted_index
217            .values()
218            .map(|p| p.postings.capacity() * compact_posting_size + vec_overhead)
219            .sum();
220
221        // Inverted index overhead
222        let index_overhead_bytes = self.inverted_index.len()
223            * (term_key_size + posting_builder_size + hashmap_entry_base_overhead);
224
225        // Term interner: Rodeo stores strings + metadata
226        // Rodeo internal: string bytes + Spur + arena overhead (~2 pointers per string)
227        let interner_arena_overhead = 2 * size_of::<usize>();
228        let avg_term_len = 8; // Estimated average term length
229        let interner_bytes =
230            self.term_interner.len() * (avg_term_len + spur_size + interner_arena_overhead);
231
232        // Doc field lengths
233        let field_lengths_bytes =
234            self.doc_field_lengths.capacity() * size_of::<u32>() + vec_overhead;
235
236        // Dense vectors
237        let mut dense_vectors_bytes: usize = 0;
238        let mut dense_vector_count: usize = 0;
239        let doc_id_ordinal_size = size_of::<(DocId, u16)>();
240        for b in self.dense_vectors.values() {
241            dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
242                + b.doc_ids.capacity() * doc_id_ordinal_size
243                + 2 * vec_overhead; // Two Vecs
244            dense_vector_count += b.doc_ids.len();
245        }
246
247        // Local buffers
248        let local_tf_entry_size = spur_size + size_of::<u32>() + fxhashmap_entry_overhead;
249        let local_tf_buffer_bytes = self.local_tf_buffer.capacity() * local_tf_entry_size;
250
251        // Sparse vectors
252        let mut sparse_vectors_bytes: usize = 0;
253        for builder in self.sparse_vectors.values() {
254            for postings in builder.postings.values() {
255                sparse_vectors_bytes += postings.capacity() * sparse_entry_size + vec_overhead;
256            }
257            // Inner FxHashMap overhead: u32 key + Vec value ptr + overhead
258            let inner_entry_size = size_of::<u32>() + vec_overhead + fxhashmap_entry_overhead;
259            sparse_vectors_bytes += builder.postings.len() * inner_entry_size;
260        }
261        // Outer FxHashMap overhead
262        let outer_sparse_entry_size =
263            size_of::<u32>() + size_of::<SparseVectorBuilder>() + fxhashmap_entry_overhead;
264        sparse_vectors_bytes += self.sparse_vectors.len() * outer_sparse_entry_size;
265
266        // Position index
267        let mut position_index_bytes: usize = 0;
268        for pos_builder in self.position_index.values() {
269            for (_, positions) in &pos_builder.postings {
270                position_index_bytes += positions.capacity() * size_of::<u32>() + vec_overhead;
271            }
272            // Vec<(DocId, Vec<u32>)> entry size
273            let pos_entry_size = size_of::<DocId>() + vec_overhead;
274            position_index_bytes += pos_builder.postings.capacity() * pos_entry_size;
275        }
276        // HashMap overhead for position_index
277        let pos_index_entry_size =
278            term_key_size + size_of::<PositionPostingListBuilder>() + hashmap_entry_base_overhead;
279        position_index_bytes += self.position_index.len() * pos_index_entry_size;
280
281        let estimated_memory_bytes = postings_bytes
282            + index_overhead_bytes
283            + interner_bytes
284            + field_lengths_bytes
285            + dense_vectors_bytes
286            + local_tf_buffer_bytes
287            + sparse_vectors_bytes
288            + position_index_bytes;
289
290        let memory_breakdown = MemoryBreakdown {
291            postings_bytes,
292            index_overhead_bytes,
293            interner_bytes,
294            field_lengths_bytes,
295            dense_vectors_bytes,
296            dense_vector_count,
297            sparse_vectors_bytes,
298            position_index_bytes,
299        };
300
301        SegmentBuilderStats {
302            num_docs: self.next_doc_id,
303            unique_terms: self.inverted_index.len(),
304            postings_in_memory,
305            interned_strings: self.term_interner.len(),
306            doc_field_lengths_size: self.doc_field_lengths.len(),
307            estimated_memory_bytes,
308            memory_breakdown,
309        }
310    }
311
312    /// Add a document - streams to disk immediately
313    pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
314        let doc_id = self.next_doc_id;
315        self.next_doc_id += 1;
316
317        // Initialize field lengths for this document
318        let base_idx = self.doc_field_lengths.len();
319        self.doc_field_lengths
320            .resize(base_idx + self.num_indexed_fields, 0);
321        self.estimated_memory += self.num_indexed_fields * std::mem::size_of::<u32>();
322
323        // Reset element ordinals for this document (for multi-valued fields)
324        self.current_element_ordinal.clear();
325
326        for (field, value) in doc.field_values() {
327            let entry = self.schema.get_field_entry(*field);
328            if entry.is_none() || !entry.unwrap().indexed {
329                continue;
330            }
331
332            let entry = entry.unwrap();
333            match (&entry.field_type, value) {
334                (FieldType::Text, FieldValue::Text(text)) => {
335                    // Get current element ordinal for multi-valued fields
336                    let element_ordinal = *self.current_element_ordinal.get(&field.0).unwrap_or(&0);
337                    let token_count =
338                        self.index_text_field(*field, doc_id, text, element_ordinal)?;
339                    // Increment element ordinal for next value of this field
340                    *self.current_element_ordinal.entry(field.0).or_insert(0) += 1;
341
342                    // Update field statistics
343                    let stats = self.field_stats.entry(field.0).or_default();
344                    stats.total_tokens += token_count as u64;
345                    // Only count each document once, even for multi-value fields
346                    if element_ordinal == 0 {
347                        stats.doc_count += 1;
348                    }
349
350                    // Store field length compactly
351                    if let Some(&slot) = self.field_to_slot.get(&field.0) {
352                        self.doc_field_lengths[base_idx + slot] = token_count;
353                    }
354                }
355                (FieldType::U64, FieldValue::U64(v)) => {
356                    self.index_numeric_field(*field, doc_id, *v)?;
357                }
358                (FieldType::I64, FieldValue::I64(v)) => {
359                    self.index_numeric_field(*field, doc_id, *v as u64)?;
360                }
361                (FieldType::F64, FieldValue::F64(v)) => {
362                    self.index_numeric_field(*field, doc_id, v.to_bits())?;
363                }
364                (FieldType::DenseVector, FieldValue::DenseVector(vec)) => {
365                    // Get current element ordinal for multi-valued fields
366                    let element_ordinal = *self.current_element_ordinal.get(&field.0).unwrap_or(&0);
367                    self.index_dense_vector_field(*field, doc_id, element_ordinal as u16, vec)?;
368                    // Increment element ordinal for next value of this field
369                    *self.current_element_ordinal.entry(field.0).or_insert(0) += 1;
370                }
371                (FieldType::SparseVector, FieldValue::SparseVector(entries)) => {
372                    // Get current element ordinal for multi-valued fields
373                    let element_ordinal = *self.current_element_ordinal.get(&field.0).unwrap_or(&0);
374                    self.index_sparse_vector_field(
375                        *field,
376                        doc_id,
377                        element_ordinal as u16,
378                        entries,
379                    )?;
380                    // Increment element ordinal for next value of this field
381                    *self.current_element_ordinal.entry(field.0).or_insert(0) += 1;
382                }
383                _ => {}
384            }
385        }
386
387        // Stream document to disk immediately
388        self.write_document_to_store(&doc)?;
389
390        Ok(doc_id)
391    }
392
393    /// Index a text field using interned terms
394    ///
395    /// Optimization: Zero-allocation inline tokenization + term frequency aggregation.
396    /// Instead of allocating a String per token, we:
397    /// 1. Iterate over whitespace-split words
398    /// 2. Build lowercase token in a reusable buffer
399    /// 3. Intern directly from the buffer
400    ///
401    /// If position recording is enabled for this field, also records token positions
402    /// encoded as (element_ordinal << 20) | token_position.
403    fn index_text_field(
404        &mut self,
405        field: Field,
406        doc_id: DocId,
407        text: &str,
408        element_ordinal: u32,
409    ) -> Result<u32> {
410        use crate::dsl::PositionMode;
411
412        let field_id = field.0;
413        let position_mode = self
414            .position_enabled_fields
415            .get(&field_id)
416            .copied()
417            .flatten();
418
419        // Phase 1: Aggregate term frequencies within this document
420        // Also collect positions if enabled
421        // Reuse buffer to avoid allocations
422        self.local_tf_buffer.clear();
423
424        // For position tracking: term -> list of positions in this text
425        let mut local_positions: FxHashMap<Spur, Vec<u32>> = FxHashMap::default();
426
427        let mut token_position = 0u32;
428
429        // Zero-allocation tokenization: iterate words, lowercase inline, intern directly
430        for word in text.split_whitespace() {
431            // Build lowercase token in reusable buffer
432            self.token_buffer.clear();
433            for c in word.chars() {
434                if c.is_alphanumeric() {
435                    for lc in c.to_lowercase() {
436                        self.token_buffer.push(lc);
437                    }
438                }
439            }
440
441            if self.token_buffer.is_empty() {
442                continue;
443            }
444
445            // Intern the term directly from buffer - O(1) amortized
446            let is_new_string = !self.term_interner.contains(&self.token_buffer);
447            let term_spur = self.term_interner.get_or_intern(&self.token_buffer);
448            if is_new_string {
449                use std::mem::size_of;
450                // string bytes + Spur + arena overhead (2 pointers)
451                self.estimated_memory +=
452                    self.token_buffer.len() + size_of::<Spur>() + 2 * size_of::<usize>();
453            }
454            *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
455
456            // Record position based on mode
457            if let Some(mode) = position_mode {
458                let encoded_pos = match mode {
459                    // Ordinal only: just store element ordinal (token position = 0)
460                    PositionMode::Ordinal => element_ordinal << 20,
461                    // Token position only: just store token position (ordinal = 0)
462                    PositionMode::TokenPosition => token_position,
463                    // Full: encode both
464                    PositionMode::Full => (element_ordinal << 20) | token_position,
465                };
466                local_positions
467                    .entry(term_spur)
468                    .or_default()
469                    .push(encoded_pos);
470            }
471
472            token_position += 1;
473        }
474
475        // Phase 2: Insert aggregated terms into inverted index
476        // Now we only do one inverted_index lookup per unique term in doc
477        for (&term_spur, &tf) in &self.local_tf_buffer {
478            let term_key = TermKey {
479                field: field_id,
480                term: term_spur,
481            };
482
483            let is_new_term = !self.inverted_index.contains_key(&term_key);
484            let posting = self
485                .inverted_index
486                .entry(term_key)
487                .or_insert_with(PostingListBuilder::new);
488            posting.add(doc_id, tf);
489
490            // Incremental memory tracking
491            use std::mem::size_of;
492            self.estimated_memory += size_of::<CompactPosting>();
493            if is_new_term {
494                // HashMap entry overhead + PostingListBuilder + Vec header
495                self.estimated_memory +=
496                    size_of::<TermKey>() + size_of::<PostingListBuilder>() + 24;
497            }
498
499            // Add positions if enabled
500            if position_mode.is_some()
501                && let Some(positions) = local_positions.get(&term_spur)
502            {
503                let is_new_pos_term = !self.position_index.contains_key(&term_key);
504                let pos_posting = self
505                    .position_index
506                    .entry(term_key)
507                    .or_insert_with(PositionPostingListBuilder::new);
508                for &pos in positions {
509                    pos_posting.add_position(doc_id, pos);
510                }
511                // Incremental memory tracking for position index
512                self.estimated_memory += positions.len() * size_of::<u32>();
513                if is_new_pos_term {
514                    self.estimated_memory +=
515                        size_of::<TermKey>() + size_of::<PositionPostingListBuilder>() + 24;
516                }
517            }
518        }
519
520        Ok(token_position)
521    }
522
523    fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
524        use std::mem::size_of;
525
526        // For numeric fields, we use a special encoding
527        let term_str = format!("__num_{}", value);
528        let is_new_string = !self.term_interner.contains(&term_str);
529        let term_spur = self.term_interner.get_or_intern(&term_str);
530
531        let term_key = TermKey {
532            field: field.0,
533            term: term_spur,
534        };
535
536        let is_new_term = !self.inverted_index.contains_key(&term_key);
537        let posting = self
538            .inverted_index
539            .entry(term_key)
540            .or_insert_with(PostingListBuilder::new);
541        posting.add(doc_id, 1);
542
543        // Incremental memory tracking
544        self.estimated_memory += size_of::<CompactPosting>();
545        if is_new_term {
546            self.estimated_memory += size_of::<TermKey>() + size_of::<PostingListBuilder>() + 24;
547        }
548        if is_new_string {
549            self.estimated_memory += term_str.len() + size_of::<Spur>() + 2 * size_of::<usize>();
550        }
551
552        Ok(())
553    }
554
555    /// Index a dense vector field with ordinal tracking
556    fn index_dense_vector_field(
557        &mut self,
558        field: Field,
559        doc_id: DocId,
560        ordinal: u16,
561        vector: &[f32],
562    ) -> Result<()> {
563        let dim = vector.len();
564
565        let builder = self
566            .dense_vectors
567            .entry(field.0)
568            .or_insert_with(|| DenseVectorBuilder::new(dim));
569
570        // Verify dimension consistency
571        if builder.dim != dim && builder.len() > 0 {
572            return Err(crate::Error::Schema(format!(
573                "Dense vector dimension mismatch: expected {}, got {}",
574                builder.dim, dim
575            )));
576        }
577
578        builder.add(doc_id, ordinal, vector);
579
580        // Incremental memory tracking
581        use std::mem::{size_of, size_of_val};
582        self.estimated_memory += size_of_val(vector) + size_of::<(DocId, u16)>();
583
584        Ok(())
585    }
586
587    /// Index a sparse vector field using dedicated sparse posting lists
588    ///
589    /// Collects (doc_id, ordinal, weight) postings per dimension. During commit, these are
590    /// converted to BlockSparsePostingList with proper quantization from SparseVectorConfig.
591    ///
592    /// Weights below the configured `weight_threshold` are not indexed.
593    fn index_sparse_vector_field(
594        &mut self,
595        field: Field,
596        doc_id: DocId,
597        ordinal: u16,
598        entries: &[(u32, f32)],
599    ) -> Result<()> {
600        // Get weight threshold from field config (default 0.0 = no filtering)
601        let weight_threshold = self
602            .schema
603            .get_field_entry(field)
604            .and_then(|entry| entry.sparse_vector_config.as_ref())
605            .map(|config| config.weight_threshold)
606            .unwrap_or(0.0);
607
608        let builder = self
609            .sparse_vectors
610            .entry(field.0)
611            .or_insert_with(SparseVectorBuilder::new);
612
613        for &(dim_id, weight) in entries {
614            // Skip weights below threshold
615            if weight.abs() < weight_threshold {
616                continue;
617            }
618
619            // Incremental memory tracking
620            use std::mem::size_of;
621            let is_new_dim = !builder.postings.contains_key(&dim_id);
622            builder.add(dim_id, doc_id, ordinal, weight);
623            self.estimated_memory += size_of::<(DocId, u16, f32)>();
624            if is_new_dim {
625                // HashMap entry overhead + Vec header
626                self.estimated_memory += size_of::<u32>() + size_of::<Vec<(DocId, u16, f32)>>() + 8; // 8 = hashmap control byte + padding
627            }
628        }
629
630        Ok(())
631    }
632
633    /// Write document to streaming store
634    fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
635        use byteorder::{LittleEndian, WriteBytesExt};
636
637        let doc_bytes = super::store::serialize_document(doc, &self.schema)?;
638
639        self.store_file
640            .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
641        self.store_file.write_all(&doc_bytes)?;
642
643        Ok(())
644    }
645
646    /// Build the final segment
647    ///
648    /// Memory optimization: each phase consumes and drops its source data before
649    /// the next phase begins, preventing accumulation of multiple large buffers.
650    pub async fn build<D: Directory + DirectoryWriter>(
651        mut self,
652        dir: &D,
653        segment_id: SegmentId,
654    ) -> Result<SegmentMeta> {
655        // Flush any buffered data
656        self.store_file.flush()?;
657
658        let files = SegmentFiles::new(segment_id.0);
659
660        // Phase 1: Build positions (consumes position_index, still borrows term_interner)
661        let position_index = std::mem::take(&mut self.position_index);
662        let (positions_data, position_offsets) =
663            Self::build_positions_owned(position_index, &self.term_interner)?;
664        // position_index memory is now freed
665
666        // Phase 2: Build postings + store in parallel
667        // Take ownership of inverted_index and term_interner so they're freed after serialization
668        let inverted_index = std::mem::take(&mut self.inverted_index);
669        let term_interner = std::mem::replace(&mut self.term_interner, Rodeo::new());
670        let store_path = self.store_path.clone();
671        let schema_clone = self.schema.clone();
672        let num_compression_threads = self.config.num_compression_threads;
673        let compression_level = self.config.compression_level;
674
675        let (postings_result, store_result) = rayon::join(
676            || Self::build_postings_owned(inverted_index, term_interner, &position_offsets),
677            || {
678                Self::build_store_batched(
679                    &store_path,
680                    &schema_clone,
681                    num_compression_threads,
682                    compression_level,
683                )
684            },
685        );
686        // inverted_index and term_interner consumed and freed by build_postings_owned
687
688        let (term_dict_data, postings_data) = postings_result?;
689        let store_data = store_result?;
690
691        // Write and immediately drop large buffers
692        dir.write(&files.term_dict, &term_dict_data).await?;
693        drop(term_dict_data);
694        dir.write(&files.postings, &postings_data).await?;
695        drop(postings_data);
696        dir.write(&files.store, &store_data).await?;
697        drop(store_data);
698
699        if !positions_data.is_empty() {
700            dir.write(&files.positions, &positions_data).await?;
701        }
702        drop(positions_data);
703        drop(position_offsets);
704
705        // Phase 3: Build dense vectors (consumes dense_vectors)
706        let dense_vectors = std::mem::take(&mut self.dense_vectors);
707        if !dense_vectors.is_empty() {
708            let vectors_data = Self::build_vectors_file_binary(dense_vectors, &self.schema)?;
709            if !vectors_data.is_empty() {
710                dir.write(&files.vectors, &vectors_data).await?;
711            }
712        }
713        // dense_vectors memory is now freed
714
715        // Phase 4: Build sparse vectors (sorts in-place, no cloning)
716        let mut sparse_vectors = std::mem::take(&mut self.sparse_vectors);
717        if !sparse_vectors.is_empty() {
718            let sparse_data = Self::build_sparse_file_inplace(&mut sparse_vectors, &self.schema)?;
719            drop(sparse_vectors);
720            if !sparse_data.is_empty() {
721                dir.write(&files.sparse, &sparse_data).await?;
722            }
723        }
724
725        let meta = SegmentMeta {
726            id: segment_id.0,
727            num_docs: self.next_doc_id,
728            field_stats: self.field_stats.clone(),
729        };
730
731        dir.write(&files.meta, &meta.serialize()?).await?;
732
733        // Cleanup temp files
734        let _ = std::fs::remove_file(&self.store_path);
735
736        Ok(meta)
737    }
738
739    /// Build unified vectors file using compact binary format.
740    ///
741    /// Writes directly from the DenseVectorBuilder's flat f32 storage,
742    /// avoiding the expensive Vec<Vec<f32>> clone and JSON serialization.
743    fn build_vectors_file_binary(
744        dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
745        schema: &Schema,
746    ) -> Result<Vec<u8>> {
747        use byteorder::{LittleEndian, WriteBytesExt};
748
749        let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
750
751        for (field_id, builder) in dense_vectors {
752            if builder.len() == 0 {
753                continue;
754            }
755
756            let field = crate::dsl::Field(field_id);
757            let dense_config = schema
758                .get_field_entry(field)
759                .and_then(|e| e.dense_vector_config.as_ref());
760
761            let index_dim = dense_config.map(|c| c.index_dim()).unwrap_or(builder.dim);
762
763            // Serialize directly from flat storage — no Vec<Vec<f32>> clone
764            let index_bytes = FlatVectorData::serialize_binary_from_flat(
765                index_dim,
766                &builder.vectors,
767                builder.dim,
768                &builder.doc_ids,
769            );
770
771            field_indexes.push((field_id, 4u8, index_bytes)); // 4 = Flat Binary
772            // builder dropped here, freeing vector memory
773        }
774
775        if field_indexes.is_empty() {
776            return Ok(Vec::new());
777        }
778
779        field_indexes.sort_by_key(|(id, _, _)| *id);
780        let header_size = 4 + field_indexes.len() * (4 + 1 + 8 + 8);
781
782        let mut output = Vec::new();
783        output.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
784
785        let mut current_offset = header_size as u64;
786        for (field_id, index_type, data) in &field_indexes {
787            output.write_u32::<LittleEndian>(*field_id)?;
788            output.write_u8(*index_type)?;
789            output.write_u64::<LittleEndian>(current_offset)?;
790            output.write_u64::<LittleEndian>(data.len() as u64)?;
791            current_offset += data.len() as u64;
792        }
793
794        for (_, _, data) in field_indexes {
795            output.extend_from_slice(&data);
796        }
797
798        Ok(output)
799    }
800
801    /// Build sparse vectors file, sorting postings in-place (no cloning).
802    ///
803    /// Takes `&mut` to the sparse vectors map so postings can be sorted in-place,
804    /// eliminating the per-dimension clone that previously doubled memory usage.
805    fn build_sparse_file_inplace(
806        sparse_vectors: &mut FxHashMap<u32, SparseVectorBuilder>,
807        schema: &Schema,
808    ) -> Result<Vec<u8>> {
809        use crate::structures::{BlockSparsePostingList, WeightQuantization};
810        use byteorder::{LittleEndian, WriteBytesExt};
811
812        if sparse_vectors.is_empty() {
813            return Ok(Vec::new());
814        }
815
816        type SparseFieldData = (u32, WeightQuantization, u32, FxHashMap<u32, Vec<u8>>);
817        let mut field_data: Vec<SparseFieldData> = Vec::new();
818
819        for (&field_id, builder) in sparse_vectors.iter_mut() {
820            if builder.is_empty() {
821                continue;
822            }
823
824            let field = crate::dsl::Field(field_id);
825            let sparse_config = schema
826                .get_field_entry(field)
827                .and_then(|e| e.sparse_vector_config.as_ref());
828
829            let quantization = sparse_config
830                .map(|c| c.weight_quantization)
831                .unwrap_or(WeightQuantization::Float32);
832
833            let block_size = sparse_config.map(|c| c.block_size).unwrap_or(128);
834            let pruning_fraction = sparse_config.and_then(|c| c.posting_list_pruning);
835
836            let mut dim_bytes: FxHashMap<u32, Vec<u8>> = FxHashMap::default();
837
838            for (&dim_id, postings) in builder.postings.iter_mut() {
839                // Sort in-place — no clone needed since we have &mut access
840                postings.sort_unstable_by_key(|(doc_id, ordinal, _)| (*doc_id, *ordinal));
841
842                // Apply posting list pruning: keep only top fraction by weight magnitude
843                if let Some(fraction) = pruning_fraction
844                    && postings.len() > 1
845                    && fraction < 1.0
846                {
847                    let original_len = postings.len();
848                    postings.sort_by(|a, b| {
849                        b.2.abs()
850                            .partial_cmp(&a.2.abs())
851                            .unwrap_or(std::cmp::Ordering::Equal)
852                    });
853                    let keep = ((original_len as f64 * fraction as f64).ceil() as usize).max(1);
854                    postings.truncate(keep);
855                    postings.sort_unstable_by_key(|(d, o, _)| (*d, *o));
856                }
857
858                let block_list = BlockSparsePostingList::from_postings_with_block_size(
859                    postings,
860                    quantization,
861                    block_size,
862                )
863                .map_err(crate::Error::Io)?;
864
865                let mut bytes = Vec::new();
866                block_list.serialize(&mut bytes).map_err(crate::Error::Io)?;
867                dim_bytes.insert(dim_id, bytes);
868            }
869
870            field_data.push((field_id, quantization, dim_bytes.len() as u32, dim_bytes));
871        }
872
873        if field_data.is_empty() {
874            return Ok(Vec::new());
875        }
876
877        field_data.sort_by_key(|(id, _, _, _)| *id);
878
879        // Calculate header size (compact format)
880        let mut header_size = 4u64;
881        for (_, _, num_dims, _) in &field_data {
882            header_size += 4 + 1 + 4;
883            header_size += (*num_dims as u64) * 16;
884        }
885
886        let mut output = Vec::new();
887        output.write_u32::<LittleEndian>(field_data.len() as u32)?;
888
889        let mut current_offset = header_size;
890        let mut all_data: Vec<u8> = Vec::new();
891        let mut field_tables: Vec<Vec<(u32, u64, u32)>> = Vec::new();
892
893        for (_, _, _, dim_bytes) in &field_data {
894            let mut table: Vec<(u32, u64, u32)> = Vec::with_capacity(dim_bytes.len());
895            let mut dims: Vec<_> = dim_bytes.keys().copied().collect();
896            dims.sort();
897
898            for dim_id in dims {
899                let bytes = &dim_bytes[&dim_id];
900                table.push((dim_id, current_offset, bytes.len() as u32));
901                current_offset += bytes.len() as u64;
902                all_data.extend_from_slice(bytes);
903            }
904            field_tables.push(table);
905        }
906
907        for (i, (field_id, quantization, num_dims, _)) in field_data.iter().enumerate() {
908            output.write_u32::<LittleEndian>(*field_id)?;
909            output.write_u8(*quantization as u8)?;
910            output.write_u32::<LittleEndian>(*num_dims)?;
911
912            for &(dim_id, offset, length) in &field_tables[i] {
913                output.write_u32::<LittleEndian>(dim_id)?;
914                output.write_u64::<LittleEndian>(offset)?;
915                output.write_u32::<LittleEndian>(length)?;
916            }
917        }
918
919        output.extend_from_slice(&all_data);
920        Ok(output)
921    }
922
923    /// Build positions file, consuming the position_index to avoid cloning positions.
924    #[allow(clippy::type_complexity)]
925    fn build_positions_owned(
926        position_index: HashMap<TermKey, PositionPostingListBuilder>,
927        term_interner: &Rodeo,
928    ) -> Result<(Vec<u8>, FxHashMap<Vec<u8>, (u64, u32)>)> {
929        use crate::structures::PositionPostingList;
930
931        let mut position_offsets: FxHashMap<Vec<u8>, (u64, u32)> = FxHashMap::default();
932
933        if position_index.is_empty() {
934            return Ok((Vec::new(), position_offsets));
935        }
936
937        // Consume HashMap into Vec for sorting (owned, no borrowing)
938        let mut entries: Vec<(Vec<u8>, PositionPostingListBuilder)> = position_index
939            .into_iter()
940            .map(|(term_key, pos_builder)| {
941                let term_str = term_interner.resolve(&term_key.term);
942                let mut key = Vec::with_capacity(4 + term_str.len());
943                key.extend_from_slice(&term_key.field.to_le_bytes());
944                key.extend_from_slice(term_str.as_bytes());
945                (key, pos_builder)
946            })
947            .collect();
948
949        entries.sort_by(|a, b| a.0.cmp(&b.0));
950
951        let mut output = Vec::new();
952
953        for (key, pos_builder) in entries {
954            let mut pos_list = PositionPostingList::with_capacity(pos_builder.postings.len());
955            for (doc_id, positions) in pos_builder.postings {
956                // Move positions instead of cloning — owned data
957                pos_list.push(doc_id, positions);
958            }
959
960            let offset = output.len() as u64;
961            pos_list.serialize(&mut output).map_err(crate::Error::Io)?;
962            let len = (output.len() as u64 - offset) as u32;
963
964            position_offsets.insert(key, (offset, len));
965        }
966
967        Ok((output, position_offsets))
968    }
969
970    /// Build postings from inverted index, consuming both the index and interner.
971    ///
972    /// Takes ownership so the inverted_index HashMap and term_interner are freed
973    /// as posting lists are serialized, rather than being held alongside the output.
974    fn build_postings_owned(
975        inverted_index: HashMap<TermKey, PostingListBuilder>,
976        term_interner: Rodeo,
977        position_offsets: &FxHashMap<Vec<u8>, (u64, u32)>,
978    ) -> Result<(Vec<u8>, Vec<u8>)> {
979        // Phase 1: Consume HashMap into sorted Vec (frees HashMap overhead)
980        let mut term_entries: Vec<(Vec<u8>, PostingListBuilder)> = inverted_index
981            .into_iter()
982            .map(|(term_key, posting_list)| {
983                let term_str = term_interner.resolve(&term_key.term);
984                let mut key = Vec::with_capacity(4 + term_str.len());
985                key.extend_from_slice(&term_key.field.to_le_bytes());
986                key.extend_from_slice(term_str.as_bytes());
987                (key, posting_list)
988            })
989            .collect();
990
991        // term_interner no longer needed — all keys are materialized
992        drop(term_interner);
993
994        term_entries.par_sort_unstable_by(|a, b| a.0.cmp(&b.0));
995
996        // Phase 2: Parallel serialization (consumes term_entries via into_par_iter,
997        // each PostingListBuilder is dropped after its posting list is serialized)
998        let serialized: Vec<(Vec<u8>, SerializedPosting)> = term_entries
999            .into_par_iter()
1000            .map(|(key, posting_builder)| {
1001                let mut full_postings = PostingList::with_capacity(posting_builder.len());
1002                for p in &posting_builder.postings {
1003                    full_postings.push(p.doc_id, p.term_freq as u32);
1004                }
1005                // posting_builder dropped here — original posting memory freed
1006
1007                let doc_ids: Vec<u32> = full_postings.iter().map(|p| p.doc_id).collect();
1008                let term_freqs: Vec<u32> = full_postings.iter().map(|p| p.term_freq).collect();
1009
1010                let has_positions = position_offsets.contains_key(&key);
1011                let result = if !has_positions
1012                    && let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs)
1013                {
1014                    SerializedPosting::Inline(inline)
1015                } else {
1016                    let mut posting_bytes = Vec::new();
1017                    let block_list =
1018                        crate::structures::BlockPostingList::from_posting_list(&full_postings)?;
1019                    block_list.serialize(&mut posting_bytes)?;
1020                    SerializedPosting::External {
1021                        bytes: posting_bytes,
1022                        doc_count: full_postings.doc_count(),
1023                    }
1024                };
1025
1026                Ok((key, result))
1027            })
1028            .collect::<Result<Vec<_>>>()?;
1029
1030        // Phase 3: Sequential assembly
1031        let mut term_dict = Vec::new();
1032        let mut postings = Vec::new();
1033        let mut writer = SSTableWriter::<TermInfo>::new(&mut term_dict);
1034
1035        for (key, serialized_posting) in serialized {
1036            let term_info = match serialized_posting {
1037                SerializedPosting::Inline(info) => info,
1038                SerializedPosting::External { bytes, doc_count } => {
1039                    let posting_offset = postings.len() as u64;
1040                    let posting_len = bytes.len() as u32;
1041                    postings.extend_from_slice(&bytes);
1042
1043                    if let Some(&(pos_offset, pos_len)) = position_offsets.get(&key) {
1044                        TermInfo::external_with_positions(
1045                            posting_offset,
1046                            posting_len,
1047                            doc_count,
1048                            pos_offset,
1049                            pos_len,
1050                        )
1051                    } else {
1052                        TermInfo::external(posting_offset, posting_len, doc_count)
1053                    }
1054                }
1055            };
1056
1057            writer.insert(&key, &term_info)?;
1058        }
1059
1060        writer.finish()?;
1061        Ok((term_dict, postings))
1062    }
1063
1064    /// Build document store from streamed temp file using batched processing.
1065    ///
1066    /// Processes documents in batches to limit peak memory, rather than
1067    /// materializing all deserialized documents at once.
1068    fn build_store_batched(
1069        store_path: &PathBuf,
1070        schema: &Schema,
1071        num_compression_threads: usize,
1072        compression_level: CompressionLevel,
1073    ) -> Result<Vec<u8>> {
1074        use super::store::EagerParallelStoreWriter;
1075
1076        let file = File::open(store_path)?;
1077        let mmap = unsafe { memmap2::Mmap::map(&file)? };
1078
1079        // Phase 1: Parse document boundaries (sequential, fast)
1080        let mut doc_ranges: Vec<(usize, usize)> = Vec::new();
1081        let mut offset = 0usize;
1082        while offset + 4 <= mmap.len() {
1083            let doc_len = u32::from_le_bytes([
1084                mmap[offset],
1085                mmap[offset + 1],
1086                mmap[offset + 2],
1087                mmap[offset + 3],
1088            ]) as usize;
1089            offset += 4;
1090
1091            if offset + doc_len > mmap.len() {
1092                break;
1093            }
1094
1095            doc_ranges.push((offset, doc_len));
1096            offset += doc_len;
1097        }
1098
1099        // Phase 2+3: Batched parallel deserialization and compression.
1100        // Each batch is deserialized in parallel, fed to the store writer,
1101        // then dropped before the next batch — limiting peak memory.
1102        const BATCH_SIZE: usize = 10_000;
1103        let mut store_data = Vec::new();
1104        let mut store_writer = EagerParallelStoreWriter::with_compression_level(
1105            &mut store_data,
1106            num_compression_threads,
1107            compression_level,
1108        );
1109
1110        for batch in doc_ranges.chunks(BATCH_SIZE) {
1111            let batch_docs: Vec<Document> = batch
1112                .par_iter()
1113                .filter_map(|&(start, len)| {
1114                    let doc_bytes = &mmap[start..start + len];
1115                    super::store::deserialize_document(doc_bytes, schema).ok()
1116                })
1117                .collect();
1118
1119            for doc in &batch_docs {
1120                store_writer.store(doc, schema)?;
1121            }
1122            // batch_docs dropped here, freeing memory for next batch
1123        }
1124
1125        store_writer.finish()?;
1126        Ok(store_data)
1127    }
1128}
1129
1130impl Drop for SegmentBuilder {
1131    fn drop(&mut self) {
1132        // Cleanup temp files on drop
1133        let _ = std::fs::remove_file(&self.store_path);
1134    }
1135}