Skip to main content

hermes_core/segment/builder/
mod.rs

1//! Streaming segment builder with optimized memory usage
2//!
3//! Key optimizations:
4//! - **String interning**: Terms are interned using `lasso` to avoid repeated allocations
5//! - **hashbrown HashMap**: O(1) average insertion instead of BTreeMap's O(log n)
6//! - **Streaming document store**: Documents written to disk immediately
7//! - **Zero-copy store build**: Pre-serialized doc bytes passed directly to compressor
8//! - **Parallel posting serialization**: Rayon parallel sort + serialize
9//! - **Inline posting fast path**: Small terms skip PostingList/BlockPostingList entirely
10
11mod config;
12mod dense;
13#[cfg(feature = "diagnostics")]
14mod diagnostics;
15mod postings;
16mod sparse;
17mod store;
18
19pub use config::{MemoryBreakdown, SegmentBuilderConfig, SegmentBuilderStats};
20
21use std::fs::{File, OpenOptions};
22use std::io::{BufWriter, Write};
23use std::mem::size_of;
24use std::path::PathBuf;
25
26use hashbrown::HashMap;
27use lasso::{Rodeo, Spur};
28use rustc_hash::FxHashMap;
29
30use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
31use std::sync::Arc;
32
33use crate::directories::{Directory, DirectoryWriter};
34use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
35use crate::tokenizer::BoxedTokenizer;
36use crate::{DocId, Result};
37
38use dense::DenseVectorBuilder;
39use postings::{CompactPosting, PositionPostingListBuilder, PostingListBuilder, TermKey};
40use sparse::SparseVectorBuilder;
41
42/// Size of the document store buffer before writing to disk
43const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; // 16MB
44
45/// Memory overhead per new term in the inverted index:
46/// HashMap entry control byte + padding + TermKey + PostingListBuilder + Vec header
47const NEW_TERM_OVERHEAD: usize = size_of::<TermKey>() + size_of::<PostingListBuilder>() + 24;
48
49/// Memory overhead per newly interned string: Spur + arena pointers (2 × usize)
50const INTERN_OVERHEAD: usize = size_of::<Spur>() + 2 * size_of::<usize>();
51
52/// Memory overhead per new term in the position index
53const NEW_POS_TERM_OVERHEAD: usize =
54    size_of::<TermKey>() + size_of::<PositionPostingListBuilder>() + 24;
55
56/// Segment builder with optimized memory usage
57///
58/// Features:
59/// - Streams documents to disk immediately (no in-memory document storage)
60/// - Uses string interning for terms (reduced allocations)
61/// - Uses hashbrown HashMap (faster than BTreeMap)
62pub struct SegmentBuilder {
63    schema: Arc<Schema>,
64    config: SegmentBuilderConfig,
65    tokenizers: FxHashMap<Field, BoxedTokenizer>,
66
67    /// String interner for terms - O(1) lookup and deduplication
68    term_interner: Rodeo,
69
70    /// Inverted index: term key -> posting list
71    inverted_index: HashMap<TermKey, PostingListBuilder>,
72
73    /// Streaming document store writer
74    store_file: BufWriter<File>,
75    store_path: PathBuf,
76
77    /// Document count
78    next_doc_id: DocId,
79
80    /// Per-field statistics for BM25F
81    field_stats: FxHashMap<u32, FieldStats>,
82
83    /// Per-document field lengths stored compactly
84    /// Uses a flat Vec instead of Vec<HashMap> for better cache locality
85    /// Layout: [doc0_field0_len, doc0_field1_len, ..., doc1_field0_len, ...]
86    doc_field_lengths: Vec<u32>,
87    num_indexed_fields: usize,
88    field_to_slot: FxHashMap<u32, usize>,
89
90    /// Reusable buffer for per-document term frequency aggregation
91    /// Avoids allocating a new hashmap for each document
92    local_tf_buffer: FxHashMap<Spur, u32>,
93
94    /// Reusable buffer for per-document position tracking (when positions enabled)
95    /// Avoids allocating a new hashmap for each text field per document
96    local_positions: FxHashMap<Spur, Vec<u32>>,
97
98    /// Reusable buffer for tokenization to avoid per-token String allocations
99    token_buffer: String,
100
101    /// Reusable buffer for numeric field term encoding (avoids format!() alloc per call)
102    numeric_buffer: String,
103
104    /// Dense vector storage per field: field -> (doc_ids, vectors)
105    /// Vectors are stored as flat f32 arrays for efficient RaBitQ indexing
106    dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
107
108    /// Sparse vector storage per field: field -> SparseVectorBuilder
109    /// Uses proper BlockSparsePostingList with configurable quantization
110    sparse_vectors: FxHashMap<u32, SparseVectorBuilder>,
111
112    /// Position index for fields with positions enabled
113    /// term key -> position posting list
114    position_index: HashMap<TermKey, PositionPostingListBuilder>,
115
116    /// Fields that have position tracking enabled, with their mode
117    position_enabled_fields: FxHashMap<u32, Option<crate::dsl::PositionMode>>,
118
119    /// Current element ordinal for multi-valued fields (reset per document)
120    current_element_ordinal: FxHashMap<u32, u32>,
121
122    /// Incrementally tracked memory estimate (avoids expensive stats() calls)
123    estimated_memory: usize,
124
125    /// Reusable buffer for document serialization (avoids per-document allocation)
126    doc_serialize_buffer: Vec<u8>,
127}
128
129impl SegmentBuilder {
130    /// Create a new segment builder
131    pub fn new(schema: Arc<Schema>, config: SegmentBuilderConfig) -> Result<Self> {
132        let segment_id = uuid::Uuid::new_v4();
133        let store_path = config
134            .temp_dir
135            .join(format!("hermes_store_{}.tmp", segment_id));
136
137        let store_file = BufWriter::with_capacity(
138            STORE_BUFFER_SIZE,
139            OpenOptions::new()
140                .create(true)
141                .write(true)
142                .truncate(true)
143                .open(&store_path)?,
144        );
145
146        // Count indexed fields for compact field length storage
147        // Also track which fields have position recording enabled
148        let mut num_indexed_fields = 0;
149        let mut field_to_slot = FxHashMap::default();
150        let mut position_enabled_fields = FxHashMap::default();
151        for (field, entry) in schema.fields() {
152            if entry.indexed && matches!(entry.field_type, FieldType::Text) {
153                field_to_slot.insert(field.0, num_indexed_fields);
154                num_indexed_fields += 1;
155                if entry.positions.is_some() {
156                    position_enabled_fields.insert(field.0, entry.positions);
157                }
158            }
159        }
160
161        Ok(Self {
162            schema,
163            tokenizers: FxHashMap::default(),
164            term_interner: Rodeo::new(),
165            inverted_index: HashMap::with_capacity(config.posting_map_capacity),
166            store_file,
167            store_path,
168            next_doc_id: 0,
169            field_stats: FxHashMap::default(),
170            doc_field_lengths: Vec::new(),
171            num_indexed_fields,
172            field_to_slot,
173            local_tf_buffer: FxHashMap::default(),
174            local_positions: FxHashMap::default(),
175            token_buffer: String::with_capacity(64),
176            numeric_buffer: String::with_capacity(32),
177            config,
178            dense_vectors: FxHashMap::default(),
179            sparse_vectors: FxHashMap::default(),
180            position_index: HashMap::new(),
181            position_enabled_fields,
182            current_element_ordinal: FxHashMap::default(),
183            estimated_memory: 0,
184            doc_serialize_buffer: Vec::with_capacity(256),
185        })
186    }
187
188    pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
189        self.tokenizers.insert(field, tokenizer);
190    }
191
192    /// Get the current element ordinal for a field and increment it.
193    /// Used for multi-valued fields (text, dense_vector, sparse_vector).
194    fn next_element_ordinal(&mut self, field_id: u32) -> u32 {
195        let ordinal = *self.current_element_ordinal.get(&field_id).unwrap_or(&0);
196        *self.current_element_ordinal.entry(field_id).or_insert(0) += 1;
197        ordinal
198    }
199
200    pub fn num_docs(&self) -> u32 {
201        self.next_doc_id
202    }
203
204    /// Fast O(1) memory estimate - updated incrementally during indexing
205    #[inline]
206    pub fn estimated_memory_bytes(&self) -> usize {
207        self.estimated_memory
208    }
209
210    /// Count total unique sparse dimensions across all fields
211    pub fn sparse_dim_count(&self) -> usize {
212        self.sparse_vectors.values().map(|b| b.postings.len()).sum()
213    }
214
215    /// Get current statistics for debugging performance (expensive - iterates all data)
216    pub fn stats(&self) -> SegmentBuilderStats {
217        use std::mem::size_of;
218
219        let postings_in_memory: usize =
220            self.inverted_index.values().map(|p| p.postings.len()).sum();
221
222        // Size constants computed from actual types
223        let compact_posting_size = size_of::<CompactPosting>();
224        let vec_overhead = size_of::<Vec<u8>>(); // Vec header: ptr + len + cap = 24 bytes on 64-bit
225        let term_key_size = size_of::<TermKey>();
226        let posting_builder_size = size_of::<PostingListBuilder>();
227        let spur_size = size_of::<lasso::Spur>();
228        let sparse_entry_size = size_of::<(DocId, u16, f32)>();
229
230        // hashbrown HashMap entry overhead: key + value + 1 byte control + padding
231        // Measured: ~(key_size + value_size + 8) per entry on average
232        let hashmap_entry_base_overhead = 8usize;
233
234        // FxHashMap uses same layout as hashbrown
235        let fxhashmap_entry_overhead = hashmap_entry_base_overhead;
236
237        // Postings memory
238        let postings_bytes: usize = self
239            .inverted_index
240            .values()
241            .map(|p| p.postings.capacity() * compact_posting_size + vec_overhead)
242            .sum();
243
244        // Inverted index overhead
245        let index_overhead_bytes = self.inverted_index.len()
246            * (term_key_size + posting_builder_size + hashmap_entry_base_overhead);
247
248        // Term interner: Rodeo stores strings + metadata
249        // Rodeo internal: string bytes + Spur + arena overhead (~2 pointers per string)
250        let interner_arena_overhead = 2 * size_of::<usize>();
251        let avg_term_len = 8; // Estimated average term length
252        let interner_bytes =
253            self.term_interner.len() * (avg_term_len + spur_size + interner_arena_overhead);
254
255        // Doc field lengths
256        let field_lengths_bytes =
257            self.doc_field_lengths.capacity() * size_of::<u32>() + vec_overhead;
258
259        // Dense vectors
260        let mut dense_vectors_bytes: usize = 0;
261        let mut dense_vector_count: usize = 0;
262        let doc_id_ordinal_size = size_of::<(DocId, u16)>();
263        for b in self.dense_vectors.values() {
264            dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
265                + b.doc_ids.capacity() * doc_id_ordinal_size
266                + 2 * vec_overhead; // Two Vecs
267            dense_vector_count += b.doc_ids.len();
268        }
269
270        // Local buffers
271        let local_tf_entry_size = spur_size + size_of::<u32>() + fxhashmap_entry_overhead;
272        let local_tf_buffer_bytes = self.local_tf_buffer.capacity() * local_tf_entry_size;
273
274        // Sparse vectors
275        let mut sparse_vectors_bytes: usize = 0;
276        for builder in self.sparse_vectors.values() {
277            for postings in builder.postings.values() {
278                sparse_vectors_bytes += postings.capacity() * sparse_entry_size + vec_overhead;
279            }
280            // Inner FxHashMap overhead: u32 key + Vec value ptr + overhead
281            let inner_entry_size = size_of::<u32>() + vec_overhead + fxhashmap_entry_overhead;
282            sparse_vectors_bytes += builder.postings.len() * inner_entry_size;
283        }
284        // Outer FxHashMap overhead
285        let outer_sparse_entry_size =
286            size_of::<u32>() + size_of::<SparseVectorBuilder>() + fxhashmap_entry_overhead;
287        sparse_vectors_bytes += self.sparse_vectors.len() * outer_sparse_entry_size;
288
289        // Position index
290        let mut position_index_bytes: usize = 0;
291        for pos_builder in self.position_index.values() {
292            for (_, positions) in &pos_builder.postings {
293                position_index_bytes += positions.capacity() * size_of::<u32>() + vec_overhead;
294            }
295            // Vec<(DocId, Vec<u32>)> entry size
296            let pos_entry_size = size_of::<DocId>() + vec_overhead;
297            position_index_bytes += pos_builder.postings.capacity() * pos_entry_size;
298        }
299        // HashMap overhead for position_index
300        let pos_index_entry_size =
301            term_key_size + size_of::<PositionPostingListBuilder>() + hashmap_entry_base_overhead;
302        position_index_bytes += self.position_index.len() * pos_index_entry_size;
303
304        let estimated_memory_bytes = postings_bytes
305            + index_overhead_bytes
306            + interner_bytes
307            + field_lengths_bytes
308            + dense_vectors_bytes
309            + local_tf_buffer_bytes
310            + sparse_vectors_bytes
311            + position_index_bytes;
312
313        let memory_breakdown = MemoryBreakdown {
314            postings_bytes,
315            index_overhead_bytes,
316            interner_bytes,
317            field_lengths_bytes,
318            dense_vectors_bytes,
319            dense_vector_count,
320            sparse_vectors_bytes,
321            position_index_bytes,
322        };
323
324        SegmentBuilderStats {
325            num_docs: self.next_doc_id,
326            unique_terms: self.inverted_index.len(),
327            postings_in_memory,
328            interned_strings: self.term_interner.len(),
329            doc_field_lengths_size: self.doc_field_lengths.len(),
330            estimated_memory_bytes,
331            memory_breakdown,
332        }
333    }
334
335    /// Add a document - streams to disk immediately
336    pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
337        let doc_id = self.next_doc_id;
338        self.next_doc_id += 1;
339
340        // Initialize field lengths for this document
341        let base_idx = self.doc_field_lengths.len();
342        self.doc_field_lengths
343            .resize(base_idx + self.num_indexed_fields, 0);
344        self.estimated_memory += self.num_indexed_fields * std::mem::size_of::<u32>();
345
346        // Reset element ordinals for this document (for multi-valued fields)
347        self.current_element_ordinal.clear();
348
349        for (field, value) in doc.field_values() {
350            let Some(entry) = self.schema.get_field_entry(*field) else {
351                continue;
352            };
353
354            // Dense vectors are written to .vectors when indexed || stored
355            // Other field types require indexed
356            if !matches!(&entry.field_type, FieldType::DenseVector) && !entry.indexed {
357                continue;
358            }
359
360            match (&entry.field_type, value) {
361                (FieldType::Text, FieldValue::Text(text)) => {
362                    let element_ordinal = self.next_element_ordinal(field.0);
363                    let token_count =
364                        self.index_text_field(*field, doc_id, text, element_ordinal)?;
365
366                    let stats = self.field_stats.entry(field.0).or_default();
367                    stats.total_tokens += token_count as u64;
368                    if element_ordinal == 0 {
369                        stats.doc_count += 1;
370                    }
371
372                    if let Some(&slot) = self.field_to_slot.get(&field.0) {
373                        self.doc_field_lengths[base_idx + slot] = token_count;
374                    }
375                }
376                (FieldType::U64, FieldValue::U64(v)) => {
377                    self.index_numeric_field(*field, doc_id, *v)?;
378                }
379                (FieldType::I64, FieldValue::I64(v)) => {
380                    self.index_numeric_field(*field, doc_id, *v as u64)?;
381                }
382                (FieldType::F64, FieldValue::F64(v)) => {
383                    self.index_numeric_field(*field, doc_id, v.to_bits())?;
384                }
385                (FieldType::DenseVector, FieldValue::DenseVector(vec))
386                    if entry.indexed || entry.stored =>
387                {
388                    let ordinal = self.next_element_ordinal(field.0);
389                    self.index_dense_vector_field(*field, doc_id, ordinal as u16, vec)?;
390                }
391                (FieldType::SparseVector, FieldValue::SparseVector(entries)) => {
392                    let ordinal = self.next_element_ordinal(field.0);
393                    self.index_sparse_vector_field(*field, doc_id, ordinal as u16, entries)?;
394                }
395                _ => {}
396            }
397        }
398
399        // Stream document to disk immediately
400        self.write_document_to_store(&doc)?;
401
402        Ok(doc_id)
403    }
404
405    /// Index a text field using interned terms
406    ///
407    /// Uses a custom tokenizer when set for the field (via `set_tokenizer`),
408    /// otherwise falls back to an inline zero-allocation path (split_whitespace
409    /// + lowercase + strip non-alphanumeric).
410    ///
411    /// If position recording is enabled for this field, also records token positions
412    /// encoded as (element_ordinal << 20) | token_position.
413    fn index_text_field(
414        &mut self,
415        field: Field,
416        doc_id: DocId,
417        text: &str,
418        element_ordinal: u32,
419    ) -> Result<u32> {
420        use crate::dsl::PositionMode;
421
422        let field_id = field.0;
423        let position_mode = self
424            .position_enabled_fields
425            .get(&field_id)
426            .copied()
427            .flatten();
428
429        // Phase 1: Aggregate term frequencies within this document
430        // Also collect positions if enabled
431        // Reuse buffers to avoid allocations
432        self.local_tf_buffer.clear();
433        // Clear position Vecs in-place (keeps allocated capacity for reuse)
434        for v in self.local_positions.values_mut() {
435            v.clear();
436        }
437
438        let mut token_position = 0u32;
439
440        // Tokenize: use custom tokenizer if set, else inline zero-alloc path.
441        // The owned Vec<Token> is computed first so the immutable borrow of
442        // self.tokenizers ends before we mutate other fields.
443        let custom_tokens = self.tokenizers.get(&field).map(|t| t.tokenize(text));
444
445        if let Some(tokens) = custom_tokens {
446            // Custom tokenizer path
447            for token in &tokens {
448                let is_new_string = !self.term_interner.contains(&token.text);
449                let term_spur = self.term_interner.get_or_intern(&token.text);
450                if is_new_string {
451                    self.estimated_memory += token.text.len() + INTERN_OVERHEAD;
452                }
453                *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
454
455                if let Some(mode) = position_mode {
456                    let encoded_pos = match mode {
457                        PositionMode::Ordinal => element_ordinal << 20,
458                        PositionMode::TokenPosition => token.position,
459                        PositionMode::Full => (element_ordinal << 20) | token.position,
460                    };
461                    self.local_positions
462                        .entry(term_spur)
463                        .or_default()
464                        .push(encoded_pos);
465                }
466            }
467            token_position = tokens.len() as u32;
468        } else {
469            // Inline zero-allocation path: split_whitespace + lowercase + strip non-alphanumeric
470            for word in text.split_whitespace() {
471                self.token_buffer.clear();
472                for c in word.chars() {
473                    if c.is_alphanumeric() {
474                        for lc in c.to_lowercase() {
475                            self.token_buffer.push(lc);
476                        }
477                    }
478                }
479
480                if self.token_buffer.is_empty() {
481                    continue;
482                }
483
484                let is_new_string = !self.term_interner.contains(&self.token_buffer);
485                let term_spur = self.term_interner.get_or_intern(&self.token_buffer);
486                if is_new_string {
487                    self.estimated_memory += self.token_buffer.len() + INTERN_OVERHEAD;
488                }
489                *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
490
491                if let Some(mode) = position_mode {
492                    let encoded_pos = match mode {
493                        PositionMode::Ordinal => element_ordinal << 20,
494                        PositionMode::TokenPosition => token_position,
495                        PositionMode::Full => (element_ordinal << 20) | token_position,
496                    };
497                    self.local_positions
498                        .entry(term_spur)
499                        .or_default()
500                        .push(encoded_pos);
501                }
502
503                token_position += 1;
504            }
505        }
506
507        // Phase 2: Insert aggregated terms into inverted index
508        // Now we only do one inverted_index lookup per unique term in doc
509        for (&term_spur, &tf) in &self.local_tf_buffer {
510            let term_key = TermKey {
511                field: field_id,
512                term: term_spur,
513            };
514
515            let is_new_term = !self.inverted_index.contains_key(&term_key);
516            let posting = self
517                .inverted_index
518                .entry(term_key)
519                .or_insert_with(PostingListBuilder::new);
520            posting.add(doc_id, tf);
521
522            self.estimated_memory += size_of::<CompactPosting>();
523            if is_new_term {
524                self.estimated_memory += NEW_TERM_OVERHEAD;
525            }
526
527            if position_mode.is_some()
528                && let Some(positions) = self.local_positions.get(&term_spur)
529            {
530                let is_new_pos_term = !self.position_index.contains_key(&term_key);
531                let pos_posting = self
532                    .position_index
533                    .entry(term_key)
534                    .or_insert_with(PositionPostingListBuilder::new);
535                for &pos in positions {
536                    pos_posting.add_position(doc_id, pos);
537                }
538                self.estimated_memory += positions.len() * size_of::<u32>();
539                if is_new_pos_term {
540                    self.estimated_memory += NEW_POS_TERM_OVERHEAD;
541                }
542            }
543        }
544
545        Ok(token_position)
546    }
547
548    fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
549        use std::fmt::Write;
550
551        self.numeric_buffer.clear();
552        write!(self.numeric_buffer, "__num_{}", value).unwrap();
553        let is_new_string = !self.term_interner.contains(&self.numeric_buffer);
554        let term_spur = self.term_interner.get_or_intern(&self.numeric_buffer);
555
556        let term_key = TermKey {
557            field: field.0,
558            term: term_spur,
559        };
560
561        let is_new_term = !self.inverted_index.contains_key(&term_key);
562        let posting = self
563            .inverted_index
564            .entry(term_key)
565            .or_insert_with(PostingListBuilder::new);
566        posting.add(doc_id, 1);
567
568        self.estimated_memory += size_of::<CompactPosting>();
569        if is_new_term {
570            self.estimated_memory += NEW_TERM_OVERHEAD;
571        }
572        if is_new_string {
573            self.estimated_memory += self.numeric_buffer.len() + INTERN_OVERHEAD;
574        }
575
576        Ok(())
577    }
578
579    /// Index a dense vector field with ordinal tracking
580    fn index_dense_vector_field(
581        &mut self,
582        field: Field,
583        doc_id: DocId,
584        ordinal: u16,
585        vector: &[f32],
586    ) -> Result<()> {
587        let dim = vector.len();
588
589        let builder = self
590            .dense_vectors
591            .entry(field.0)
592            .or_insert_with(|| DenseVectorBuilder::new(dim));
593
594        // Verify dimension consistency
595        if builder.dim != dim && builder.len() > 0 {
596            return Err(crate::Error::Schema(format!(
597                "Dense vector dimension mismatch: expected {}, got {}",
598                builder.dim, dim
599            )));
600        }
601
602        builder.add(doc_id, ordinal, vector);
603
604        self.estimated_memory += std::mem::size_of_val(vector) + size_of::<(DocId, u16)>();
605
606        Ok(())
607    }
608
609    /// Index a sparse vector field using dedicated sparse posting lists
610    ///
611    /// Collects (doc_id, ordinal, weight) postings per dimension. During commit, these are
612    /// converted to BlockSparsePostingList with proper quantization from SparseVectorConfig.
613    ///
614    /// Weights below the configured `weight_threshold` are not indexed.
615    fn index_sparse_vector_field(
616        &mut self,
617        field: Field,
618        doc_id: DocId,
619        ordinal: u16,
620        entries: &[(u32, f32)],
621    ) -> Result<()> {
622        // Get weight threshold from field config (default 0.0 = no filtering)
623        let weight_threshold = self
624            .schema
625            .get_field_entry(field)
626            .and_then(|entry| entry.sparse_vector_config.as_ref())
627            .map(|config| config.weight_threshold)
628            .unwrap_or(0.0);
629
630        let builder = self
631            .sparse_vectors
632            .entry(field.0)
633            .or_insert_with(SparseVectorBuilder::new);
634
635        for &(dim_id, weight) in entries {
636            // Skip weights below threshold
637            if weight.abs() < weight_threshold {
638                continue;
639            }
640
641            let is_new_dim = !builder.postings.contains_key(&dim_id);
642            builder.add(dim_id, doc_id, ordinal, weight);
643            self.estimated_memory += size_of::<(DocId, u16, f32)>();
644            if is_new_dim {
645                // HashMap entry overhead + Vec header
646                self.estimated_memory += size_of::<u32>() + size_of::<Vec<(DocId, u16, f32)>>() + 8; // 8 = hashmap control byte + padding
647            }
648        }
649
650        Ok(())
651    }
652
653    /// Write document to streaming store (reuses internal buffer to avoid per-doc allocation)
654    fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
655        use byteorder::{LittleEndian, WriteBytesExt};
656
657        super::store::serialize_document_into(doc, &self.schema, &mut self.doc_serialize_buffer)?;
658
659        self.store_file
660            .write_u32::<LittleEndian>(self.doc_serialize_buffer.len() as u32)?;
661        self.store_file.write_all(&self.doc_serialize_buffer)?;
662
663        Ok(())
664    }
665
666    /// Build the final segment
667    ///
668    /// Streams all data directly to disk via StreamingWriter to avoid buffering
669    /// entire serialized outputs in memory. Each phase consumes and drops its
670    /// source data before the next phase begins.
671    pub async fn build<D: Directory + DirectoryWriter>(
672        mut self,
673        dir: &D,
674        segment_id: SegmentId,
675        trained: Option<&super::TrainedVectorStructures>,
676    ) -> Result<SegmentMeta> {
677        // Flush any buffered data
678        self.store_file.flush()?;
679
680        let files = SegmentFiles::new(segment_id.0);
681
682        // Phase 1: Stream positions directly to disk (consumes position_index)
683        let position_index = std::mem::take(&mut self.position_index);
684        let position_offsets = if !position_index.is_empty() {
685            let mut pos_writer = dir.streaming_writer(&files.positions).await?;
686            let offsets = postings::build_positions_streaming(
687                position_index,
688                &self.term_interner,
689                &mut *pos_writer,
690            )?;
691            pos_writer.finish()?;
692            offsets
693        } else {
694            FxHashMap::default()
695        };
696
697        // Phase 2: 4-way parallel build — postings, store, dense vectors, sparse vectors
698        // These are fully independent: different source data, different output files.
699        let inverted_index = std::mem::take(&mut self.inverted_index);
700        let term_interner = std::mem::replace(&mut self.term_interner, Rodeo::new());
701        let store_path = self.store_path.clone();
702        let num_compression_threads = self.config.num_compression_threads;
703        let compression_level = self.config.compression_level;
704        let dense_vectors = std::mem::take(&mut self.dense_vectors);
705        let mut sparse_vectors = std::mem::take(&mut self.sparse_vectors);
706        let schema = &self.schema;
707
708        // Pre-create all streaming writers (async) before entering sync rayon scope
709        let mut term_dict_writer = dir.streaming_writer(&files.term_dict).await?;
710        let mut postings_writer = dir.streaming_writer(&files.postings).await?;
711        let mut store_writer = dir.streaming_writer(&files.store).await?;
712        let mut vectors_writer = if !dense_vectors.is_empty() {
713            Some(dir.streaming_writer(&files.vectors).await?)
714        } else {
715            None
716        };
717        let mut sparse_writer = if !sparse_vectors.is_empty() {
718            Some(dir.streaming_writer(&files.sparse).await?)
719        } else {
720            None
721        };
722
723        let ((postings_result, store_result), (vectors_result, sparse_result)) = rayon::join(
724            || {
725                rayon::join(
726                    || {
727                        postings::build_postings_streaming(
728                            inverted_index,
729                            term_interner,
730                            &position_offsets,
731                            &mut *term_dict_writer,
732                            &mut *postings_writer,
733                        )
734                    },
735                    || {
736                        store::build_store_streaming(
737                            &store_path,
738                            num_compression_threads,
739                            compression_level,
740                            &mut *store_writer,
741                        )
742                    },
743                )
744            },
745            || {
746                rayon::join(
747                    || -> Result<()> {
748                        if let Some(ref mut w) = vectors_writer {
749                            dense::build_vectors_streaming(
750                                dense_vectors,
751                                schema,
752                                trained,
753                                &mut **w,
754                            )?;
755                        }
756                        Ok(())
757                    },
758                    || -> Result<()> {
759                        if let Some(ref mut w) = sparse_writer {
760                            sparse::build_sparse_streaming(&mut sparse_vectors, schema, &mut **w)?;
761                        }
762                        Ok(())
763                    },
764                )
765            },
766        );
767        postings_result?;
768        store_result?;
769        vectors_result?;
770        sparse_result?;
771        term_dict_writer.finish()?;
772        postings_writer.finish()?;
773        store_writer.finish()?;
774        if let Some(w) = vectors_writer {
775            w.finish()?;
776        }
777        if let Some(w) = sparse_writer {
778            w.finish()?;
779        }
780        drop(position_offsets);
781        drop(sparse_vectors);
782
783        let meta = SegmentMeta {
784            id: segment_id.0,
785            num_docs: self.next_doc_id,
786            field_stats: self.field_stats.clone(),
787        };
788
789        dir.write(&files.meta, &meta.serialize()?).await?;
790
791        // Cleanup temp files
792        let _ = std::fs::remove_file(&self.store_path);
793
794        Ok(meta)
795    }
796}
797
798impl Drop for SegmentBuilder {
799    fn drop(&mut self) {
800        // Cleanup temp files on drop
801        let _ = std::fs::remove_file(&self.store_path);
802    }
803}