Skip to main content

hermes_core/segment/builder/
mod.rs

1//! Streaming segment builder with optimized memory usage
2//!
3//! Key optimizations:
4//! - **String interning**: Terms are interned using `lasso` to avoid repeated allocations
5//! - **hashbrown HashMap**: O(1) average insertion instead of BTreeMap's O(log n)
6//! - **Streaming document store**: Documents written to disk immediately
7//! - **Zero-copy store build**: Pre-serialized doc bytes passed directly to compressor
8//! - **Parallel posting serialization**: Rayon parallel sort + serialize
9//! - **Inline posting fast path**: Small terms skip PostingList/BlockPostingList entirely
10
11mod config;
12mod dense_build;
13mod posting;
14mod postings_build;
15mod sparse_build;
16mod store_build;
17mod vectors;
18
19pub use config::{MemoryBreakdown, SegmentBuilderConfig, SegmentBuilderStats};
20
21use std::fs::{File, OpenOptions};
22use std::io::{BufWriter, Write};
23use std::mem::size_of;
24use std::path::PathBuf;
25
26use hashbrown::HashMap;
27use lasso::{Rodeo, Spur};
28use rustc_hash::FxHashMap;
29
30use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
31use std::sync::Arc;
32
33use crate::directories::{Directory, DirectoryWriter};
34use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
35use crate::tokenizer::BoxedTokenizer;
36use crate::{DocId, Result};
37
38use posting::{CompactPosting, PositionPostingListBuilder, PostingListBuilder, TermKey};
39use vectors::{DenseVectorBuilder, SparseVectorBuilder};
40
41/// Size of the document store buffer before writing to disk
42const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; // 16MB
43
44/// Memory overhead per new term in the inverted index:
45/// HashMap entry control byte + padding + TermKey + PostingListBuilder + Vec header
46const NEW_TERM_OVERHEAD: usize = size_of::<TermKey>() + size_of::<PostingListBuilder>() + 24;
47
48/// Memory overhead per newly interned string: Spur + arena pointers (2 × usize)
49const INTERN_OVERHEAD: usize = size_of::<Spur>() + 2 * size_of::<usize>();
50
51/// Memory overhead per new term in the position index
52const NEW_POS_TERM_OVERHEAD: usize =
53    size_of::<TermKey>() + size_of::<PositionPostingListBuilder>() + 24;
54
55/// Segment builder with optimized memory usage
56///
57/// Features:
58/// - Streams documents to disk immediately (no in-memory document storage)
59/// - Uses string interning for terms (reduced allocations)
60/// - Uses hashbrown HashMap (faster than BTreeMap)
61pub struct SegmentBuilder {
62    schema: Arc<Schema>,
63    config: SegmentBuilderConfig,
64    tokenizers: FxHashMap<Field, BoxedTokenizer>,
65
66    /// String interner for terms - O(1) lookup and deduplication
67    term_interner: Rodeo,
68
69    /// Inverted index: term key -> posting list
70    inverted_index: HashMap<TermKey, PostingListBuilder>,
71
72    /// Streaming document store writer
73    store_file: BufWriter<File>,
74    store_path: PathBuf,
75
76    /// Document count
77    next_doc_id: DocId,
78
79    /// Per-field statistics for BM25F
80    field_stats: FxHashMap<u32, FieldStats>,
81
82    /// Per-document field lengths stored compactly
83    /// Uses a flat Vec instead of Vec<HashMap> for better cache locality
84    /// Layout: [doc0_field0_len, doc0_field1_len, ..., doc1_field0_len, ...]
85    doc_field_lengths: Vec<u32>,
86    num_indexed_fields: usize,
87    field_to_slot: FxHashMap<u32, usize>,
88
89    /// Reusable buffer for per-document term frequency aggregation
90    /// Avoids allocating a new hashmap for each document
91    local_tf_buffer: FxHashMap<Spur, u32>,
92
93    /// Reusable buffer for per-document position tracking (when positions enabled)
94    /// Avoids allocating a new hashmap for each text field per document
95    local_positions: FxHashMap<Spur, Vec<u32>>,
96
97    /// Reusable buffer for tokenization to avoid per-token String allocations
98    token_buffer: String,
99
100    /// Reusable buffer for numeric field term encoding (avoids format!() alloc per call)
101    numeric_buffer: String,
102
103    /// Dense vector storage per field: field -> (doc_ids, vectors)
104    /// Vectors are stored as flat f32 arrays for efficient RaBitQ indexing
105    dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
106
107    /// Sparse vector storage per field: field -> SparseVectorBuilder
108    /// Uses proper BlockSparsePostingList with configurable quantization
109    sparse_vectors: FxHashMap<u32, SparseVectorBuilder>,
110
111    /// Position index for fields with positions enabled
112    /// term key -> position posting list
113    position_index: HashMap<TermKey, PositionPostingListBuilder>,
114
115    /// Fields that have position tracking enabled, with their mode
116    position_enabled_fields: FxHashMap<u32, Option<crate::dsl::PositionMode>>,
117
118    /// Current element ordinal for multi-valued fields (reset per document)
119    current_element_ordinal: FxHashMap<u32, u32>,
120
121    /// Incrementally tracked memory estimate (avoids expensive stats() calls)
122    estimated_memory: usize,
123
124    /// Reusable buffer for document serialization (avoids per-document allocation)
125    doc_serialize_buffer: Vec<u8>,
126}
127
128impl SegmentBuilder {
129    /// Create a new segment builder
130    pub fn new(schema: Arc<Schema>, config: SegmentBuilderConfig) -> Result<Self> {
131        let segment_id = uuid::Uuid::new_v4();
132        let store_path = config
133            .temp_dir
134            .join(format!("hermes_store_{}.tmp", segment_id));
135
136        let store_file = BufWriter::with_capacity(
137            STORE_BUFFER_SIZE,
138            OpenOptions::new()
139                .create(true)
140                .write(true)
141                .truncate(true)
142                .open(&store_path)?,
143        );
144
145        // Count indexed fields for compact field length storage
146        // Also track which fields have position recording enabled
147        let mut num_indexed_fields = 0;
148        let mut field_to_slot = FxHashMap::default();
149        let mut position_enabled_fields = FxHashMap::default();
150        for (field, entry) in schema.fields() {
151            if entry.indexed && matches!(entry.field_type, FieldType::Text) {
152                field_to_slot.insert(field.0, num_indexed_fields);
153                num_indexed_fields += 1;
154                if entry.positions.is_some() {
155                    position_enabled_fields.insert(field.0, entry.positions);
156                }
157            }
158        }
159
160        Ok(Self {
161            schema,
162            tokenizers: FxHashMap::default(),
163            term_interner: Rodeo::new(),
164            inverted_index: HashMap::with_capacity(config.posting_map_capacity),
165            store_file,
166            store_path,
167            next_doc_id: 0,
168            field_stats: FxHashMap::default(),
169            doc_field_lengths: Vec::new(),
170            num_indexed_fields,
171            field_to_slot,
172            local_tf_buffer: FxHashMap::default(),
173            local_positions: FxHashMap::default(),
174            token_buffer: String::with_capacity(64),
175            numeric_buffer: String::with_capacity(32),
176            config,
177            dense_vectors: FxHashMap::default(),
178            sparse_vectors: FxHashMap::default(),
179            position_index: HashMap::new(),
180            position_enabled_fields,
181            current_element_ordinal: FxHashMap::default(),
182            estimated_memory: 0,
183            doc_serialize_buffer: Vec::with_capacity(256),
184        })
185    }
186
187    pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
188        self.tokenizers.insert(field, tokenizer);
189    }
190
191    /// Get the current element ordinal for a field and increment it.
192    /// Used for multi-valued fields (text, dense_vector, sparse_vector).
193    fn next_element_ordinal(&mut self, field_id: u32) -> u32 {
194        let ordinal = *self.current_element_ordinal.get(&field_id).unwrap_or(&0);
195        *self.current_element_ordinal.entry(field_id).or_insert(0) += 1;
196        ordinal
197    }
198
199    pub fn num_docs(&self) -> u32 {
200        self.next_doc_id
201    }
202
203    /// Fast O(1) memory estimate - updated incrementally during indexing
204    #[inline]
205    pub fn estimated_memory_bytes(&self) -> usize {
206        self.estimated_memory
207    }
208
209    /// Count total unique sparse dimensions across all fields
210    pub fn sparse_dim_count(&self) -> usize {
211        self.sparse_vectors.values().map(|b| b.postings.len()).sum()
212    }
213
214    /// Get current statistics for debugging performance (expensive - iterates all data)
215    pub fn stats(&self) -> SegmentBuilderStats {
216        use std::mem::size_of;
217
218        let postings_in_memory: usize =
219            self.inverted_index.values().map(|p| p.postings.len()).sum();
220
221        // Size constants computed from actual types
222        let compact_posting_size = size_of::<CompactPosting>();
223        let vec_overhead = size_of::<Vec<u8>>(); // Vec header: ptr + len + cap = 24 bytes on 64-bit
224        let term_key_size = size_of::<TermKey>();
225        let posting_builder_size = size_of::<PostingListBuilder>();
226        let spur_size = size_of::<lasso::Spur>();
227        let sparse_entry_size = size_of::<(DocId, u16, f32)>();
228
229        // hashbrown HashMap entry overhead: key + value + 1 byte control + padding
230        // Measured: ~(key_size + value_size + 8) per entry on average
231        let hashmap_entry_base_overhead = 8usize;
232
233        // FxHashMap uses same layout as hashbrown
234        let fxhashmap_entry_overhead = hashmap_entry_base_overhead;
235
236        // Postings memory
237        let postings_bytes: usize = self
238            .inverted_index
239            .values()
240            .map(|p| p.postings.capacity() * compact_posting_size + vec_overhead)
241            .sum();
242
243        // Inverted index overhead
244        let index_overhead_bytes = self.inverted_index.len()
245            * (term_key_size + posting_builder_size + hashmap_entry_base_overhead);
246
247        // Term interner: Rodeo stores strings + metadata
248        // Rodeo internal: string bytes + Spur + arena overhead (~2 pointers per string)
249        let interner_arena_overhead = 2 * size_of::<usize>();
250        let avg_term_len = 8; // Estimated average term length
251        let interner_bytes =
252            self.term_interner.len() * (avg_term_len + spur_size + interner_arena_overhead);
253
254        // Doc field lengths
255        let field_lengths_bytes =
256            self.doc_field_lengths.capacity() * size_of::<u32>() + vec_overhead;
257
258        // Dense vectors
259        let mut dense_vectors_bytes: usize = 0;
260        let mut dense_vector_count: usize = 0;
261        let doc_id_ordinal_size = size_of::<(DocId, u16)>();
262        for b in self.dense_vectors.values() {
263            dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
264                + b.doc_ids.capacity() * doc_id_ordinal_size
265                + 2 * vec_overhead; // Two Vecs
266            dense_vector_count += b.doc_ids.len();
267        }
268
269        // Local buffers
270        let local_tf_entry_size = spur_size + size_of::<u32>() + fxhashmap_entry_overhead;
271        let local_tf_buffer_bytes = self.local_tf_buffer.capacity() * local_tf_entry_size;
272
273        // Sparse vectors
274        let mut sparse_vectors_bytes: usize = 0;
275        for builder in self.sparse_vectors.values() {
276            for postings in builder.postings.values() {
277                sparse_vectors_bytes += postings.capacity() * sparse_entry_size + vec_overhead;
278            }
279            // Inner FxHashMap overhead: u32 key + Vec value ptr + overhead
280            let inner_entry_size = size_of::<u32>() + vec_overhead + fxhashmap_entry_overhead;
281            sparse_vectors_bytes += builder.postings.len() * inner_entry_size;
282        }
283        // Outer FxHashMap overhead
284        let outer_sparse_entry_size =
285            size_of::<u32>() + size_of::<SparseVectorBuilder>() + fxhashmap_entry_overhead;
286        sparse_vectors_bytes += self.sparse_vectors.len() * outer_sparse_entry_size;
287
288        // Position index
289        let mut position_index_bytes: usize = 0;
290        for pos_builder in self.position_index.values() {
291            for (_, positions) in &pos_builder.postings {
292                position_index_bytes += positions.capacity() * size_of::<u32>() + vec_overhead;
293            }
294            // Vec<(DocId, Vec<u32>)> entry size
295            let pos_entry_size = size_of::<DocId>() + vec_overhead;
296            position_index_bytes += pos_builder.postings.capacity() * pos_entry_size;
297        }
298        // HashMap overhead for position_index
299        let pos_index_entry_size =
300            term_key_size + size_of::<PositionPostingListBuilder>() + hashmap_entry_base_overhead;
301        position_index_bytes += self.position_index.len() * pos_index_entry_size;
302
303        let estimated_memory_bytes = postings_bytes
304            + index_overhead_bytes
305            + interner_bytes
306            + field_lengths_bytes
307            + dense_vectors_bytes
308            + local_tf_buffer_bytes
309            + sparse_vectors_bytes
310            + position_index_bytes;
311
312        let memory_breakdown = MemoryBreakdown {
313            postings_bytes,
314            index_overhead_bytes,
315            interner_bytes,
316            field_lengths_bytes,
317            dense_vectors_bytes,
318            dense_vector_count,
319            sparse_vectors_bytes,
320            position_index_bytes,
321        };
322
323        SegmentBuilderStats {
324            num_docs: self.next_doc_id,
325            unique_terms: self.inverted_index.len(),
326            postings_in_memory,
327            interned_strings: self.term_interner.len(),
328            doc_field_lengths_size: self.doc_field_lengths.len(),
329            estimated_memory_bytes,
330            memory_breakdown,
331        }
332    }
333
334    /// Add a document - streams to disk immediately
335    pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
336        let doc_id = self.next_doc_id;
337        self.next_doc_id += 1;
338
339        // Initialize field lengths for this document
340        let base_idx = self.doc_field_lengths.len();
341        self.doc_field_lengths
342            .resize(base_idx + self.num_indexed_fields, 0);
343        self.estimated_memory += self.num_indexed_fields * std::mem::size_of::<u32>();
344
345        // Reset element ordinals for this document (for multi-valued fields)
346        self.current_element_ordinal.clear();
347
348        for (field, value) in doc.field_values() {
349            let Some(entry) = self.schema.get_field_entry(*field) else {
350                continue;
351            };
352
353            // Dense vectors are written to .vectors when indexed || stored
354            // Other field types require indexed
355            if !matches!(&entry.field_type, FieldType::DenseVector) && !entry.indexed {
356                continue;
357            }
358
359            match (&entry.field_type, value) {
360                (FieldType::Text, FieldValue::Text(text)) => {
361                    let element_ordinal = self.next_element_ordinal(field.0);
362                    let token_count =
363                        self.index_text_field(*field, doc_id, text, element_ordinal)?;
364
365                    let stats = self.field_stats.entry(field.0).or_default();
366                    stats.total_tokens += token_count as u64;
367                    if element_ordinal == 0 {
368                        stats.doc_count += 1;
369                    }
370
371                    if let Some(&slot) = self.field_to_slot.get(&field.0) {
372                        self.doc_field_lengths[base_idx + slot] = token_count;
373                    }
374                }
375                (FieldType::U64, FieldValue::U64(v)) => {
376                    self.index_numeric_field(*field, doc_id, *v)?;
377                }
378                (FieldType::I64, FieldValue::I64(v)) => {
379                    self.index_numeric_field(*field, doc_id, *v as u64)?;
380                }
381                (FieldType::F64, FieldValue::F64(v)) => {
382                    self.index_numeric_field(*field, doc_id, v.to_bits())?;
383                }
384                (FieldType::DenseVector, FieldValue::DenseVector(vec))
385                    if entry.indexed || entry.stored =>
386                {
387                    let ordinal = self.next_element_ordinal(field.0);
388                    self.index_dense_vector_field(*field, doc_id, ordinal as u16, vec)?;
389                }
390                (FieldType::SparseVector, FieldValue::SparseVector(entries)) => {
391                    let ordinal = self.next_element_ordinal(field.0);
392                    self.index_sparse_vector_field(*field, doc_id, ordinal as u16, entries)?;
393                }
394                _ => {}
395            }
396        }
397
398        // Stream document to disk immediately
399        self.write_document_to_store(&doc)?;
400
401        Ok(doc_id)
402    }
403
404    /// Index a text field using interned terms
405    ///
406    /// Uses a custom tokenizer when set for the field (via `set_tokenizer`),
407    /// otherwise falls back to an inline zero-allocation path (split_whitespace
408    /// + lowercase + strip non-alphanumeric).
409    ///
410    /// If position recording is enabled for this field, also records token positions
411    /// encoded as (element_ordinal << 20) | token_position.
412    fn index_text_field(
413        &mut self,
414        field: Field,
415        doc_id: DocId,
416        text: &str,
417        element_ordinal: u32,
418    ) -> Result<u32> {
419        use crate::dsl::PositionMode;
420
421        let field_id = field.0;
422        let position_mode = self
423            .position_enabled_fields
424            .get(&field_id)
425            .copied()
426            .flatten();
427
428        // Phase 1: Aggregate term frequencies within this document
429        // Also collect positions if enabled
430        // Reuse buffers to avoid allocations
431        self.local_tf_buffer.clear();
432        // Clear position Vecs in-place (keeps allocated capacity for reuse)
433        for v in self.local_positions.values_mut() {
434            v.clear();
435        }
436
437        let mut token_position = 0u32;
438
439        // Tokenize: use custom tokenizer if set, else inline zero-alloc path.
440        // The owned Vec<Token> is computed first so the immutable borrow of
441        // self.tokenizers ends before we mutate other fields.
442        let custom_tokens = self.tokenizers.get(&field).map(|t| t.tokenize(text));
443
444        if let Some(tokens) = custom_tokens {
445            // Custom tokenizer path
446            for token in &tokens {
447                let is_new_string = !self.term_interner.contains(&token.text);
448                let term_spur = self.term_interner.get_or_intern(&token.text);
449                if is_new_string {
450                    self.estimated_memory += token.text.len() + INTERN_OVERHEAD;
451                }
452                *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
453
454                if let Some(mode) = position_mode {
455                    let encoded_pos = match mode {
456                        PositionMode::Ordinal => element_ordinal << 20,
457                        PositionMode::TokenPosition => token.position,
458                        PositionMode::Full => (element_ordinal << 20) | token.position,
459                    };
460                    self.local_positions
461                        .entry(term_spur)
462                        .or_default()
463                        .push(encoded_pos);
464                }
465            }
466            token_position = tokens.len() as u32;
467        } else {
468            // Inline zero-allocation path: split_whitespace + lowercase + strip non-alphanumeric
469            for word in text.split_whitespace() {
470                self.token_buffer.clear();
471                for c in word.chars() {
472                    if c.is_alphanumeric() {
473                        for lc in c.to_lowercase() {
474                            self.token_buffer.push(lc);
475                        }
476                    }
477                }
478
479                if self.token_buffer.is_empty() {
480                    continue;
481                }
482
483                let is_new_string = !self.term_interner.contains(&self.token_buffer);
484                let term_spur = self.term_interner.get_or_intern(&self.token_buffer);
485                if is_new_string {
486                    self.estimated_memory += self.token_buffer.len() + INTERN_OVERHEAD;
487                }
488                *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
489
490                if let Some(mode) = position_mode {
491                    let encoded_pos = match mode {
492                        PositionMode::Ordinal => element_ordinal << 20,
493                        PositionMode::TokenPosition => token_position,
494                        PositionMode::Full => (element_ordinal << 20) | token_position,
495                    };
496                    self.local_positions
497                        .entry(term_spur)
498                        .or_default()
499                        .push(encoded_pos);
500                }
501
502                token_position += 1;
503            }
504        }
505
506        // Phase 2: Insert aggregated terms into inverted index
507        // Now we only do one inverted_index lookup per unique term in doc
508        for (&term_spur, &tf) in &self.local_tf_buffer {
509            let term_key = TermKey {
510                field: field_id,
511                term: term_spur,
512            };
513
514            let is_new_term = !self.inverted_index.contains_key(&term_key);
515            let posting = self
516                .inverted_index
517                .entry(term_key)
518                .or_insert_with(PostingListBuilder::new);
519            posting.add(doc_id, tf);
520
521            self.estimated_memory += size_of::<CompactPosting>();
522            if is_new_term {
523                self.estimated_memory += NEW_TERM_OVERHEAD;
524            }
525
526            if position_mode.is_some()
527                && let Some(positions) = self.local_positions.get(&term_spur)
528            {
529                let is_new_pos_term = !self.position_index.contains_key(&term_key);
530                let pos_posting = self
531                    .position_index
532                    .entry(term_key)
533                    .or_insert_with(PositionPostingListBuilder::new);
534                for &pos in positions {
535                    pos_posting.add_position(doc_id, pos);
536                }
537                self.estimated_memory += positions.len() * size_of::<u32>();
538                if is_new_pos_term {
539                    self.estimated_memory += NEW_POS_TERM_OVERHEAD;
540                }
541            }
542        }
543
544        Ok(token_position)
545    }
546
547    fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
548        use std::fmt::Write;
549
550        self.numeric_buffer.clear();
551        write!(self.numeric_buffer, "__num_{}", value).unwrap();
552        let is_new_string = !self.term_interner.contains(&self.numeric_buffer);
553        let term_spur = self.term_interner.get_or_intern(&self.numeric_buffer);
554
555        let term_key = TermKey {
556            field: field.0,
557            term: term_spur,
558        };
559
560        let is_new_term = !self.inverted_index.contains_key(&term_key);
561        let posting = self
562            .inverted_index
563            .entry(term_key)
564            .or_insert_with(PostingListBuilder::new);
565        posting.add(doc_id, 1);
566
567        self.estimated_memory += size_of::<CompactPosting>();
568        if is_new_term {
569            self.estimated_memory += NEW_TERM_OVERHEAD;
570        }
571        if is_new_string {
572            self.estimated_memory += self.numeric_buffer.len() + INTERN_OVERHEAD;
573        }
574
575        Ok(())
576    }
577
578    /// Index a dense vector field with ordinal tracking
579    fn index_dense_vector_field(
580        &mut self,
581        field: Field,
582        doc_id: DocId,
583        ordinal: u16,
584        vector: &[f32],
585    ) -> Result<()> {
586        let dim = vector.len();
587
588        let builder = self
589            .dense_vectors
590            .entry(field.0)
591            .or_insert_with(|| DenseVectorBuilder::new(dim));
592
593        // Verify dimension consistency
594        if builder.dim != dim && builder.len() > 0 {
595            return Err(crate::Error::Schema(format!(
596                "Dense vector dimension mismatch: expected {}, got {}",
597                builder.dim, dim
598            )));
599        }
600
601        builder.add(doc_id, ordinal, vector);
602
603        self.estimated_memory += std::mem::size_of_val(vector) + size_of::<(DocId, u16)>();
604
605        Ok(())
606    }
607
608    /// Index a sparse vector field using dedicated sparse posting lists
609    ///
610    /// Collects (doc_id, ordinal, weight) postings per dimension. During commit, these are
611    /// converted to BlockSparsePostingList with proper quantization from SparseVectorConfig.
612    ///
613    /// Weights below the configured `weight_threshold` are not indexed.
614    fn index_sparse_vector_field(
615        &mut self,
616        field: Field,
617        doc_id: DocId,
618        ordinal: u16,
619        entries: &[(u32, f32)],
620    ) -> Result<()> {
621        // Get weight threshold from field config (default 0.0 = no filtering)
622        let weight_threshold = self
623            .schema
624            .get_field_entry(field)
625            .and_then(|entry| entry.sparse_vector_config.as_ref())
626            .map(|config| config.weight_threshold)
627            .unwrap_or(0.0);
628
629        let builder = self
630            .sparse_vectors
631            .entry(field.0)
632            .or_insert_with(SparseVectorBuilder::new);
633
634        for &(dim_id, weight) in entries {
635            // Skip weights below threshold
636            if weight.abs() < weight_threshold {
637                continue;
638            }
639
640            let is_new_dim = !builder.postings.contains_key(&dim_id);
641            builder.add(dim_id, doc_id, ordinal, weight);
642            self.estimated_memory += size_of::<(DocId, u16, f32)>();
643            if is_new_dim {
644                // HashMap entry overhead + Vec header
645                self.estimated_memory += size_of::<u32>() + size_of::<Vec<(DocId, u16, f32)>>() + 8; // 8 = hashmap control byte + padding
646            }
647        }
648
649        Ok(())
650    }
651
652    /// Write document to streaming store (reuses internal buffer to avoid per-doc allocation)
653    fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
654        use byteorder::{LittleEndian, WriteBytesExt};
655
656        super::store::serialize_document_into(doc, &self.schema, &mut self.doc_serialize_buffer)?;
657
658        self.store_file
659            .write_u32::<LittleEndian>(self.doc_serialize_buffer.len() as u32)?;
660        self.store_file.write_all(&self.doc_serialize_buffer)?;
661
662        Ok(())
663    }
664
665    /// Build the final segment
666    ///
667    /// Streams all data directly to disk via StreamingWriter to avoid buffering
668    /// entire serialized outputs in memory. Each phase consumes and drops its
669    /// source data before the next phase begins.
670    pub async fn build<D: Directory + DirectoryWriter>(
671        mut self,
672        dir: &D,
673        segment_id: SegmentId,
674        trained: Option<&super::TrainedVectorStructures>,
675    ) -> Result<SegmentMeta> {
676        // Flush any buffered data
677        self.store_file.flush()?;
678
679        let files = SegmentFiles::new(segment_id.0);
680
681        // Phase 1: Stream positions directly to disk (consumes position_index)
682        let position_index = std::mem::take(&mut self.position_index);
683        let position_offsets = if !position_index.is_empty() {
684            let mut pos_writer = dir.streaming_writer(&files.positions).await?;
685            let offsets = postings_build::build_positions_streaming(
686                position_index,
687                &self.term_interner,
688                &mut *pos_writer,
689            )?;
690            pos_writer.finish()?;
691            offsets
692        } else {
693            FxHashMap::default()
694        };
695
696        // Phase 2: 4-way parallel build — postings, store, dense vectors, sparse vectors
697        // These are fully independent: different source data, different output files.
698        let inverted_index = std::mem::take(&mut self.inverted_index);
699        let term_interner = std::mem::replace(&mut self.term_interner, Rodeo::new());
700        let store_path = self.store_path.clone();
701        let num_compression_threads = self.config.num_compression_threads;
702        let compression_level = self.config.compression_level;
703        let dense_vectors = std::mem::take(&mut self.dense_vectors);
704        let mut sparse_vectors = std::mem::take(&mut self.sparse_vectors);
705        let schema = &self.schema;
706
707        // Pre-create all streaming writers (async) before entering sync rayon scope
708        let mut term_dict_writer = dir.streaming_writer(&files.term_dict).await?;
709        let mut postings_writer = dir.streaming_writer(&files.postings).await?;
710        let mut store_writer = dir.streaming_writer(&files.store).await?;
711        let mut vectors_writer = if !dense_vectors.is_empty() {
712            Some(dir.streaming_writer(&files.vectors).await?)
713        } else {
714            None
715        };
716        let mut sparse_writer = if !sparse_vectors.is_empty() {
717            Some(dir.streaming_writer(&files.sparse).await?)
718        } else {
719            None
720        };
721
722        let ((postings_result, store_result), (vectors_result, sparse_result)) = rayon::join(
723            || {
724                rayon::join(
725                    || {
726                        postings_build::build_postings_streaming(
727                            inverted_index,
728                            term_interner,
729                            &position_offsets,
730                            &mut *term_dict_writer,
731                            &mut *postings_writer,
732                        )
733                    },
734                    || {
735                        store_build::build_store_streaming(
736                            &store_path,
737                            num_compression_threads,
738                            compression_level,
739                            &mut *store_writer,
740                        )
741                    },
742                )
743            },
744            || {
745                rayon::join(
746                    || -> Result<()> {
747                        if let Some(ref mut w) = vectors_writer {
748                            dense_build::build_vectors_streaming(
749                                dense_vectors,
750                                schema,
751                                trained,
752                                &mut **w,
753                            )?;
754                        }
755                        Ok(())
756                    },
757                    || -> Result<()> {
758                        if let Some(ref mut w) = sparse_writer {
759                            sparse_build::build_sparse_streaming(
760                                &mut sparse_vectors,
761                                schema,
762                                &mut **w,
763                            )?;
764                        }
765                        Ok(())
766                    },
767                )
768            },
769        );
770        postings_result?;
771        store_result?;
772        vectors_result?;
773        sparse_result?;
774        term_dict_writer.finish()?;
775        postings_writer.finish()?;
776        store_writer.finish()?;
777        if let Some(w) = vectors_writer {
778            w.finish()?;
779        }
780        if let Some(w) = sparse_writer {
781            w.finish()?;
782        }
783        drop(position_offsets);
784        drop(sparse_vectors);
785
786        let meta = SegmentMeta {
787            id: segment_id.0,
788            num_docs: self.next_doc_id,
789            field_stats: self.field_stats.clone(),
790        };
791
792        dir.write(&files.meta, &meta.serialize()?).await?;
793
794        // Cleanup temp files
795        let _ = std::fs::remove_file(&self.store_path);
796
797        Ok(meta)
798    }
799}
800
801impl Drop for SegmentBuilder {
802    fn drop(&mut self) {
803        // Cleanup temp files on drop
804        let _ = std::fs::remove_file(&self.store_path);
805    }
806}