Skip to main content

hermes_core/segment/builder/
mod.rs

1//! Streaming segment builder with optimized memory usage
2//!
3//! Key optimizations:
4//! - **String interning**: Terms are interned using `lasso` to avoid repeated allocations
5//! - **hashbrown HashMap**: O(1) average insertion instead of BTreeMap's O(log n)
6//! - **Streaming document store**: Documents written to disk immediately
7//! - **Zero-copy store build**: Pre-serialized doc bytes passed directly to compressor
8//! - **Parallel posting serialization**: Rayon parallel sort + serialize
9//! - **Inline posting fast path**: Small terms skip PostingList/BlockPostingList entirely
10
11mod config;
12mod dense;
13#[cfg(feature = "diagnostics")]
14mod diagnostics;
15mod postings;
16mod sparse;
17mod store;
18
19pub use config::{MemoryBreakdown, SegmentBuilderConfig, SegmentBuilderStats};
20
21use std::fs::{File, OpenOptions};
22use std::io::{BufWriter, Write};
23use std::mem::size_of;
24use std::path::PathBuf;
25
26use hashbrown::HashMap;
27use lasso::{Rodeo, Spur};
28use rustc_hash::FxHashMap;
29
30use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
31use std::sync::Arc;
32
33use crate::directories::{Directory, DirectoryWriter};
34use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
35use crate::tokenizer::BoxedTokenizer;
36use crate::{DocId, Result};
37
38use dense::DenseVectorBuilder;
39use postings::{CompactPosting, PositionPostingListBuilder, PostingListBuilder, TermKey};
40use sparse::SparseVectorBuilder;
41
42/// Size of the document store buffer before writing to disk
43const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; // 16MB
44
45/// Memory overhead per new term in the inverted index:
46/// HashMap entry control byte + padding + TermKey + PostingListBuilder + Vec header
47const NEW_TERM_OVERHEAD: usize = size_of::<TermKey>() + size_of::<PostingListBuilder>() + 24;
48
49/// Memory overhead per newly interned string: Spur + arena pointers (2 × usize)
50const INTERN_OVERHEAD: usize = size_of::<Spur>() + 2 * size_of::<usize>();
51
52/// Memory overhead per new term in the position index
53const NEW_POS_TERM_OVERHEAD: usize =
54    size_of::<TermKey>() + size_of::<PositionPostingListBuilder>() + 24;
55
56/// Segment builder with optimized memory usage
57///
58/// Features:
59/// - Streams documents to disk immediately (no in-memory document storage)
60/// - Uses string interning for terms (reduced allocations)
61/// - Uses hashbrown HashMap (faster than BTreeMap)
62pub struct SegmentBuilder {
63    schema: Arc<Schema>,
64    config: SegmentBuilderConfig,
65    tokenizers: FxHashMap<Field, BoxedTokenizer>,
66
67    /// String interner for terms - O(1) lookup and deduplication
68    term_interner: Rodeo,
69
70    /// Inverted index: term key -> posting list
71    inverted_index: HashMap<TermKey, PostingListBuilder>,
72
73    /// Streaming document store writer
74    store_file: BufWriter<File>,
75    store_path: PathBuf,
76
77    /// Document count
78    next_doc_id: DocId,
79
80    /// Per-field statistics for BM25F
81    field_stats: FxHashMap<u32, FieldStats>,
82
83    /// Per-document field lengths stored compactly
84    /// Uses a flat Vec instead of Vec<HashMap> for better cache locality
85    /// Layout: [doc0_field0_len, doc0_field1_len, ..., doc1_field0_len, ...]
86    doc_field_lengths: Vec<u32>,
87    num_indexed_fields: usize,
88    field_to_slot: FxHashMap<u32, usize>,
89
90    /// Reusable buffer for per-document term frequency aggregation
91    /// Avoids allocating a new hashmap for each document
92    local_tf_buffer: FxHashMap<Spur, u32>,
93
94    /// Reusable buffer for per-document position tracking (when positions enabled)
95    /// Avoids allocating a new hashmap for each text field per document
96    local_positions: FxHashMap<Spur, Vec<u32>>,
97
98    /// Reusable buffer for tokenization to avoid per-token String allocations
99    token_buffer: String,
100
101    /// Reusable buffer for numeric field term encoding (avoids format!() alloc per call)
102    numeric_buffer: String,
103
104    /// Dense vector storage per field: field -> (doc_ids, vectors)
105    /// Vectors are stored as flat f32 arrays for efficient RaBitQ indexing
106    dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
107
108    /// Sparse vector storage per field: field -> SparseVectorBuilder
109    /// Uses proper BlockSparsePostingList with configurable quantization
110    sparse_vectors: FxHashMap<u32, SparseVectorBuilder>,
111
112    /// Position index for fields with positions enabled
113    /// term key -> position posting list
114    position_index: HashMap<TermKey, PositionPostingListBuilder>,
115
116    /// Fields that have position tracking enabled, with their mode
117    position_enabled_fields: FxHashMap<u32, Option<crate::dsl::PositionMode>>,
118
119    /// Current element ordinal for multi-valued fields (reset per document)
120    current_element_ordinal: FxHashMap<u32, u32>,
121
122    /// Incrementally tracked memory estimate (avoids expensive stats() calls)
123    estimated_memory: usize,
124
125    /// Reusable buffer for document serialization (avoids per-document allocation)
126    doc_serialize_buffer: Vec<u8>,
127
128    /// Fast-field columnar writers per field_id (only for fields with fast=true)
129    fast_fields: FxHashMap<u32, crate::structures::fast_field::FastFieldWriter>,
130}
131
132impl SegmentBuilder {
133    /// Create a new segment builder
134    pub fn new(schema: Arc<Schema>, config: SegmentBuilderConfig) -> Result<Self> {
135        let segment_id = uuid::Uuid::new_v4();
136        let store_path = config
137            .temp_dir
138            .join(format!("hermes_store_{}.tmp", segment_id));
139
140        let store_file = BufWriter::with_capacity(
141            STORE_BUFFER_SIZE,
142            OpenOptions::new()
143                .create(true)
144                .write(true)
145                .truncate(true)
146                .open(&store_path)?,
147        );
148
149        // Count indexed fields, track positions, and auto-configure tokenizers
150        let registry = crate::tokenizer::TokenizerRegistry::new();
151        let mut num_indexed_fields = 0;
152        let mut field_to_slot = FxHashMap::default();
153        let mut position_enabled_fields = FxHashMap::default();
154        let mut tokenizers = FxHashMap::default();
155        for (field, entry) in schema.fields() {
156            if entry.indexed && matches!(entry.field_type, FieldType::Text) {
157                field_to_slot.insert(field.0, num_indexed_fields);
158                num_indexed_fields += 1;
159                if entry.positions.is_some() {
160                    position_enabled_fields.insert(field.0, entry.positions);
161                }
162                if let Some(ref tok_name) = entry.tokenizer
163                    && let Some(tokenizer) = registry.get(tok_name)
164                {
165                    tokenizers.insert(field, tokenizer);
166                }
167            }
168        }
169
170        // Initialize fast-field writers for fields with fast=true
171        use crate::structures::fast_field::{FastFieldColumnType, FastFieldWriter};
172        let mut fast_fields = FxHashMap::default();
173        for (field, entry) in schema.fields() {
174            if entry.fast {
175                let writer = if entry.multi {
176                    match entry.field_type {
177                        FieldType::U64 => {
178                            FastFieldWriter::new_numeric_multi(FastFieldColumnType::U64)
179                        }
180                        FieldType::I64 => {
181                            FastFieldWriter::new_numeric_multi(FastFieldColumnType::I64)
182                        }
183                        FieldType::F64 => {
184                            FastFieldWriter::new_numeric_multi(FastFieldColumnType::F64)
185                        }
186                        FieldType::Text => FastFieldWriter::new_text_multi(),
187                        _ => continue,
188                    }
189                } else {
190                    match entry.field_type {
191                        FieldType::U64 => FastFieldWriter::new_numeric(FastFieldColumnType::U64),
192                        FieldType::I64 => FastFieldWriter::new_numeric(FastFieldColumnType::I64),
193                        FieldType::F64 => FastFieldWriter::new_numeric(FastFieldColumnType::F64),
194                        FieldType::Text => FastFieldWriter::new_text(),
195                        _ => continue,
196                    }
197                };
198                fast_fields.insert(field.0, writer);
199            }
200        }
201
202        Ok(Self {
203            schema,
204            tokenizers,
205            term_interner: Rodeo::new(),
206            inverted_index: HashMap::with_capacity(config.posting_map_capacity),
207            store_file,
208            store_path,
209            next_doc_id: 0,
210            field_stats: FxHashMap::default(),
211            doc_field_lengths: Vec::new(),
212            num_indexed_fields,
213            field_to_slot,
214            local_tf_buffer: FxHashMap::default(),
215            local_positions: FxHashMap::default(),
216            token_buffer: String::with_capacity(64),
217            numeric_buffer: String::with_capacity(32),
218            config,
219            dense_vectors: FxHashMap::default(),
220            sparse_vectors: FxHashMap::default(),
221            position_index: HashMap::new(),
222            position_enabled_fields,
223            current_element_ordinal: FxHashMap::default(),
224            estimated_memory: 0,
225            doc_serialize_buffer: Vec::with_capacity(256),
226            fast_fields,
227        })
228    }
229
230    pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
231        self.tokenizers.insert(field, tokenizer);
232    }
233
234    /// Get the current element ordinal for a field and increment it.
235    /// Used for multi-valued fields (text, dense_vector, sparse_vector).
236    fn next_element_ordinal(&mut self, field_id: u32) -> u32 {
237        let ordinal = *self.current_element_ordinal.get(&field_id).unwrap_or(&0);
238        *self.current_element_ordinal.entry(field_id).or_insert(0) += 1;
239        ordinal
240    }
241
242    pub fn num_docs(&self) -> u32 {
243        self.next_doc_id
244    }
245
246    /// Fast O(1) memory estimate - updated incrementally during indexing
247    #[inline]
248    pub fn estimated_memory_bytes(&self) -> usize {
249        self.estimated_memory
250    }
251
252    /// Count total unique sparse dimensions across all fields
253    pub fn sparse_dim_count(&self) -> usize {
254        self.sparse_vectors.values().map(|b| b.postings.len()).sum()
255    }
256
257    /// Get current statistics for debugging performance (expensive - iterates all data)
258    pub fn stats(&self) -> SegmentBuilderStats {
259        use std::mem::size_of;
260
261        let postings_in_memory: usize =
262            self.inverted_index.values().map(|p| p.postings.len()).sum();
263
264        // Size constants computed from actual types
265        let compact_posting_size = size_of::<CompactPosting>();
266        let vec_overhead = size_of::<Vec<u8>>(); // Vec header: ptr + len + cap = 24 bytes on 64-bit
267        let term_key_size = size_of::<TermKey>();
268        let posting_builder_size = size_of::<PostingListBuilder>();
269        let spur_size = size_of::<lasso::Spur>();
270        let sparse_entry_size = size_of::<(DocId, u16, f32)>();
271
272        // hashbrown HashMap entry overhead: key + value + 1 byte control + padding
273        // Measured: ~(key_size + value_size + 8) per entry on average
274        let hashmap_entry_base_overhead = 8usize;
275
276        // FxHashMap uses same layout as hashbrown
277        let fxhashmap_entry_overhead = hashmap_entry_base_overhead;
278
279        // Postings memory
280        let postings_bytes: usize = self
281            .inverted_index
282            .values()
283            .map(|p| p.postings.capacity() * compact_posting_size + vec_overhead)
284            .sum();
285
286        // Inverted index overhead
287        let index_overhead_bytes = self.inverted_index.len()
288            * (term_key_size + posting_builder_size + hashmap_entry_base_overhead);
289
290        // Term interner: Rodeo stores strings + metadata
291        // Rodeo internal: string bytes + Spur + arena overhead (~2 pointers per string)
292        let interner_arena_overhead = 2 * size_of::<usize>();
293        let avg_term_len = 8; // Estimated average term length
294        let interner_bytes =
295            self.term_interner.len() * (avg_term_len + spur_size + interner_arena_overhead);
296
297        // Doc field lengths
298        let field_lengths_bytes =
299            self.doc_field_lengths.capacity() * size_of::<u32>() + vec_overhead;
300
301        // Dense vectors
302        let mut dense_vectors_bytes: usize = 0;
303        let mut dense_vector_count: usize = 0;
304        let doc_id_ordinal_size = size_of::<(DocId, u16)>();
305        for b in self.dense_vectors.values() {
306            dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
307                + b.doc_ids.capacity() * doc_id_ordinal_size
308                + 2 * vec_overhead; // Two Vecs
309            dense_vector_count += b.doc_ids.len();
310        }
311
312        // Local buffers
313        let local_tf_entry_size = spur_size + size_of::<u32>() + fxhashmap_entry_overhead;
314        let local_tf_buffer_bytes = self.local_tf_buffer.capacity() * local_tf_entry_size;
315
316        // Sparse vectors
317        let mut sparse_vectors_bytes: usize = 0;
318        for builder in self.sparse_vectors.values() {
319            for postings in builder.postings.values() {
320                sparse_vectors_bytes += postings.capacity() * sparse_entry_size + vec_overhead;
321            }
322            // Inner FxHashMap overhead: u32 key + Vec value ptr + overhead
323            let inner_entry_size = size_of::<u32>() + vec_overhead + fxhashmap_entry_overhead;
324            sparse_vectors_bytes += builder.postings.len() * inner_entry_size;
325        }
326        // Outer FxHashMap overhead
327        let outer_sparse_entry_size =
328            size_of::<u32>() + size_of::<SparseVectorBuilder>() + fxhashmap_entry_overhead;
329        sparse_vectors_bytes += self.sparse_vectors.len() * outer_sparse_entry_size;
330
331        // Position index
332        let mut position_index_bytes: usize = 0;
333        for pos_builder in self.position_index.values() {
334            for (_, positions) in &pos_builder.postings {
335                position_index_bytes += positions.capacity() * size_of::<u32>() + vec_overhead;
336            }
337            // Vec<(DocId, Vec<u32>)> entry size
338            let pos_entry_size = size_of::<DocId>() + vec_overhead;
339            position_index_bytes += pos_builder.postings.capacity() * pos_entry_size;
340        }
341        // HashMap overhead for position_index
342        let pos_index_entry_size =
343            term_key_size + size_of::<PositionPostingListBuilder>() + hashmap_entry_base_overhead;
344        position_index_bytes += self.position_index.len() * pos_index_entry_size;
345
346        let estimated_memory_bytes = postings_bytes
347            + index_overhead_bytes
348            + interner_bytes
349            + field_lengths_bytes
350            + dense_vectors_bytes
351            + local_tf_buffer_bytes
352            + sparse_vectors_bytes
353            + position_index_bytes;
354
355        let memory_breakdown = MemoryBreakdown {
356            postings_bytes,
357            index_overhead_bytes,
358            interner_bytes,
359            field_lengths_bytes,
360            dense_vectors_bytes,
361            dense_vector_count,
362            sparse_vectors_bytes,
363            position_index_bytes,
364        };
365
366        SegmentBuilderStats {
367            num_docs: self.next_doc_id,
368            unique_terms: self.inverted_index.len(),
369            postings_in_memory,
370            interned_strings: self.term_interner.len(),
371            doc_field_lengths_size: self.doc_field_lengths.len(),
372            estimated_memory_bytes,
373            memory_breakdown,
374        }
375    }
376
377    /// Add a document - streams to disk immediately
378    pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
379        let doc_id = self.next_doc_id;
380        self.next_doc_id += 1;
381
382        // Initialize field lengths for this document
383        let base_idx = self.doc_field_lengths.len();
384        self.doc_field_lengths
385            .resize(base_idx + self.num_indexed_fields, 0);
386        self.estimated_memory += self.num_indexed_fields * std::mem::size_of::<u32>();
387
388        // Reset element ordinals for this document (for multi-valued fields)
389        self.current_element_ordinal.clear();
390
391        for (field, value) in doc.field_values() {
392            let Some(entry) = self.schema.get_field_entry(*field) else {
393                continue;
394            };
395
396            // Dense vectors are written to .vectors when indexed || stored
397            // Other field types require indexed or fast
398            if !matches!(&entry.field_type, FieldType::DenseVector) && !entry.indexed && !entry.fast
399            {
400                continue;
401            }
402
403            match (&entry.field_type, value) {
404                (FieldType::Text, FieldValue::Text(text)) => {
405                    if entry.indexed {
406                        let element_ordinal = self.next_element_ordinal(field.0);
407                        let token_count =
408                            self.index_text_field(*field, doc_id, text, element_ordinal)?;
409
410                        let stats = self.field_stats.entry(field.0).or_default();
411                        stats.total_tokens += token_count as u64;
412                        if element_ordinal == 0 {
413                            stats.doc_count += 1;
414                        }
415
416                        if let Some(&slot) = self.field_to_slot.get(&field.0) {
417                            self.doc_field_lengths[base_idx + slot] = token_count;
418                        }
419                    }
420
421                    // Fast-field: store raw text for text ordinal column
422                    if let Some(ff) = self.fast_fields.get_mut(&field.0) {
423                        ff.add_text(doc_id, text);
424                    }
425                }
426                (FieldType::U64, FieldValue::U64(v)) => {
427                    if entry.indexed {
428                        self.index_numeric_field(*field, doc_id, *v)?;
429                    }
430                    if let Some(ff) = self.fast_fields.get_mut(&field.0) {
431                        ff.add_u64(doc_id, *v);
432                    }
433                }
434                (FieldType::I64, FieldValue::I64(v)) => {
435                    if entry.indexed {
436                        self.index_numeric_field(*field, doc_id, *v as u64)?;
437                    }
438                    if let Some(ff) = self.fast_fields.get_mut(&field.0) {
439                        ff.add_i64(doc_id, *v);
440                    }
441                }
442                (FieldType::F64, FieldValue::F64(v)) => {
443                    if entry.indexed {
444                        self.index_numeric_field(*field, doc_id, v.to_bits())?;
445                    }
446                    if let Some(ff) = self.fast_fields.get_mut(&field.0) {
447                        ff.add_f64(doc_id, *v);
448                    }
449                }
450                (FieldType::DenseVector, FieldValue::DenseVector(vec))
451                    if entry.indexed || entry.stored =>
452                {
453                    let ordinal = self.next_element_ordinal(field.0);
454                    self.index_dense_vector_field(*field, doc_id, ordinal as u16, vec)?;
455                }
456                (FieldType::SparseVector, FieldValue::SparseVector(entries)) => {
457                    let ordinal = self.next_element_ordinal(field.0);
458                    self.index_sparse_vector_field(*field, doc_id, ordinal as u16, entries)?;
459                }
460                _ => {}
461            }
462        }
463
464        // Stream document to disk immediately
465        self.write_document_to_store(&doc)?;
466
467        Ok(doc_id)
468    }
469
470    /// Index a text field using interned terms
471    ///
472    /// Uses a custom tokenizer when set for the field (via `set_tokenizer`),
473    /// otherwise falls back to an inline zero-allocation path (split_whitespace
474    /// + lowercase + strip non-alphanumeric).
475    ///
476    /// If position recording is enabled for this field, also records token positions
477    /// encoded as (element_ordinal << 20) | token_position.
478    fn index_text_field(
479        &mut self,
480        field: Field,
481        doc_id: DocId,
482        text: &str,
483        element_ordinal: u32,
484    ) -> Result<u32> {
485        use crate::dsl::PositionMode;
486
487        let field_id = field.0;
488        let position_mode = self
489            .position_enabled_fields
490            .get(&field_id)
491            .copied()
492            .flatten();
493
494        // Phase 1: Aggregate term frequencies within this document
495        // Also collect positions if enabled
496        // Reuse buffers to avoid allocations
497        self.local_tf_buffer.clear();
498        // Clear position Vecs in-place (keeps allocated capacity for reuse)
499        for v in self.local_positions.values_mut() {
500            v.clear();
501        }
502
503        let mut token_position = 0u32;
504
505        // Tokenize: use custom tokenizer if set, else inline zero-alloc path.
506        // The owned Vec<Token> is computed first so the immutable borrow of
507        // self.tokenizers ends before we mutate other fields.
508        let custom_tokens = self.tokenizers.get(&field).map(|t| t.tokenize(text));
509
510        if let Some(tokens) = custom_tokens {
511            // Custom tokenizer path
512            for token in &tokens {
513                let term_spur = if let Some(spur) = self.term_interner.get(&token.text) {
514                    spur
515                } else {
516                    let spur = self.term_interner.get_or_intern(&token.text);
517                    self.estimated_memory += token.text.len() + INTERN_OVERHEAD;
518                    spur
519                };
520                *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
521
522                if let Some(mode) = position_mode {
523                    let encoded_pos = match mode {
524                        PositionMode::Ordinal => element_ordinal << 20,
525                        PositionMode::TokenPosition => token.position,
526                        PositionMode::Full => (element_ordinal << 20) | token.position,
527                    };
528                    self.local_positions
529                        .entry(term_spur)
530                        .or_default()
531                        .push(encoded_pos);
532                }
533            }
534            token_position = tokens.len() as u32;
535        } else {
536            // Inline zero-allocation path: split_whitespace + lowercase + strip non-alphanumeric
537            for word in text.split_whitespace() {
538                self.token_buffer.clear();
539                for c in word.chars() {
540                    if c.is_alphanumeric() {
541                        for lc in c.to_lowercase() {
542                            self.token_buffer.push(lc);
543                        }
544                    }
545                }
546
547                if self.token_buffer.is_empty() {
548                    continue;
549                }
550
551                let term_spur = if let Some(spur) = self.term_interner.get(&self.token_buffer) {
552                    spur
553                } else {
554                    let spur = self.term_interner.get_or_intern(&self.token_buffer);
555                    self.estimated_memory += self.token_buffer.len() + INTERN_OVERHEAD;
556                    spur
557                };
558                *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
559
560                if let Some(mode) = position_mode {
561                    let encoded_pos = match mode {
562                        PositionMode::Ordinal => element_ordinal << 20,
563                        PositionMode::TokenPosition => token_position,
564                        PositionMode::Full => (element_ordinal << 20) | token_position,
565                    };
566                    self.local_positions
567                        .entry(term_spur)
568                        .or_default()
569                        .push(encoded_pos);
570                }
571
572                token_position += 1;
573            }
574        }
575
576        // Phase 2: Insert aggregated terms into inverted index
577        // Now we only do one inverted_index lookup per unique term in doc
578        for (&term_spur, &tf) in &self.local_tf_buffer {
579            let term_key = TermKey {
580                field: field_id,
581                term: term_spur,
582            };
583
584            match self.inverted_index.entry(term_key) {
585                hashbrown::hash_map::Entry::Occupied(mut o) => {
586                    o.get_mut().add(doc_id, tf);
587                    self.estimated_memory += size_of::<CompactPosting>();
588                }
589                hashbrown::hash_map::Entry::Vacant(v) => {
590                    let mut posting = PostingListBuilder::new();
591                    posting.add(doc_id, tf);
592                    v.insert(posting);
593                    self.estimated_memory += size_of::<CompactPosting>() + NEW_TERM_OVERHEAD;
594                }
595            }
596
597            if position_mode.is_some()
598                && let Some(positions) = self.local_positions.get(&term_spur)
599            {
600                match self.position_index.entry(term_key) {
601                    hashbrown::hash_map::Entry::Occupied(mut o) => {
602                        for &pos in positions {
603                            o.get_mut().add_position(doc_id, pos);
604                        }
605                        self.estimated_memory += positions.len() * size_of::<u32>();
606                    }
607                    hashbrown::hash_map::Entry::Vacant(v) => {
608                        let mut pos_posting = PositionPostingListBuilder::new();
609                        for &pos in positions {
610                            pos_posting.add_position(doc_id, pos);
611                        }
612                        self.estimated_memory +=
613                            positions.len() * size_of::<u32>() + NEW_POS_TERM_OVERHEAD;
614                        v.insert(pos_posting);
615                    }
616                }
617            }
618        }
619
620        Ok(token_position)
621    }
622
623    fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
624        use std::fmt::Write;
625
626        self.numeric_buffer.clear();
627        write!(self.numeric_buffer, "__num_{}", value).unwrap();
628        let term_spur = if let Some(spur) = self.term_interner.get(&self.numeric_buffer) {
629            spur
630        } else {
631            let spur = self.term_interner.get_or_intern(&self.numeric_buffer);
632            self.estimated_memory += self.numeric_buffer.len() + INTERN_OVERHEAD;
633            spur
634        };
635
636        let term_key = TermKey {
637            field: field.0,
638            term: term_spur,
639        };
640
641        match self.inverted_index.entry(term_key) {
642            hashbrown::hash_map::Entry::Occupied(mut o) => {
643                o.get_mut().add(doc_id, 1);
644                self.estimated_memory += size_of::<CompactPosting>();
645            }
646            hashbrown::hash_map::Entry::Vacant(v) => {
647                let mut posting = PostingListBuilder::new();
648                posting.add(doc_id, 1);
649                v.insert(posting);
650                self.estimated_memory += size_of::<CompactPosting>() + NEW_TERM_OVERHEAD;
651            }
652        }
653
654        Ok(())
655    }
656
657    /// Index a dense vector field with ordinal tracking
658    fn index_dense_vector_field(
659        &mut self,
660        field: Field,
661        doc_id: DocId,
662        ordinal: u16,
663        vector: &[f32],
664    ) -> Result<()> {
665        let dim = vector.len();
666
667        let builder = self
668            .dense_vectors
669            .entry(field.0)
670            .or_insert_with(|| DenseVectorBuilder::new(dim));
671
672        // Verify dimension consistency
673        if builder.dim != dim && builder.len() > 0 {
674            return Err(crate::Error::Schema(format!(
675                "Dense vector dimension mismatch: expected {}, got {}",
676                builder.dim, dim
677            )));
678        }
679
680        builder.add(doc_id, ordinal, vector);
681
682        self.estimated_memory += std::mem::size_of_val(vector) + size_of::<(DocId, u16)>();
683
684        Ok(())
685    }
686
687    /// Index a sparse vector field using dedicated sparse posting lists
688    ///
689    /// Collects (doc_id, ordinal, weight) postings per dimension. During commit, these are
690    /// converted to BlockSparsePostingList with proper quantization from SparseVectorConfig.
691    ///
692    /// Weights below the configured `weight_threshold` are not indexed.
693    fn index_sparse_vector_field(
694        &mut self,
695        field: Field,
696        doc_id: DocId,
697        ordinal: u16,
698        entries: &[(u32, f32)],
699    ) -> Result<()> {
700        // Get weight threshold from field config (default 0.0 = no filtering)
701        let weight_threshold = self
702            .schema
703            .get_field_entry(field)
704            .and_then(|entry| entry.sparse_vector_config.as_ref())
705            .map(|config| config.weight_threshold)
706            .unwrap_or(0.0);
707
708        let builder = self
709            .sparse_vectors
710            .entry(field.0)
711            .or_insert_with(SparseVectorBuilder::new);
712
713        builder.inc_vector_count();
714
715        for &(dim_id, weight) in entries {
716            // Skip weights below threshold
717            if weight.abs() < weight_threshold {
718                continue;
719            }
720
721            let is_new_dim = !builder.postings.contains_key(&dim_id);
722            builder.add(dim_id, doc_id, ordinal, weight);
723            self.estimated_memory += size_of::<(DocId, u16, f32)>();
724            if is_new_dim {
725                // HashMap entry overhead + Vec header
726                self.estimated_memory += size_of::<u32>() + size_of::<Vec<(DocId, u16, f32)>>() + 8; // 8 = hashmap control byte + padding
727            }
728        }
729
730        Ok(())
731    }
732
733    /// Write document to streaming store (reuses internal buffer to avoid per-doc allocation)
734    fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
735        use byteorder::{LittleEndian, WriteBytesExt};
736
737        super::store::serialize_document_into(doc, &self.schema, &mut self.doc_serialize_buffer)?;
738
739        self.store_file
740            .write_u32::<LittleEndian>(self.doc_serialize_buffer.len() as u32)?;
741        self.store_file.write_all(&self.doc_serialize_buffer)?;
742
743        Ok(())
744    }
745
746    /// Build the final segment
747    ///
748    /// Streams all data directly to disk via StreamingWriter to avoid buffering
749    /// entire serialized outputs in memory. Each phase consumes and drops its
750    /// source data before the next phase begins.
751    pub async fn build<D: Directory + DirectoryWriter>(
752        mut self,
753        dir: &D,
754        segment_id: SegmentId,
755        trained: Option<&super::TrainedVectorStructures>,
756    ) -> Result<SegmentMeta> {
757        // Flush any buffered data
758        self.store_file.flush()?;
759
760        let files = SegmentFiles::new(segment_id.0);
761
762        // Phase 1: Stream positions directly to disk (consumes position_index)
763        let position_index = std::mem::take(&mut self.position_index);
764        let position_offsets = if !position_index.is_empty() {
765            let mut pos_writer = dir.streaming_writer(&files.positions).await?;
766            let offsets = postings::build_positions_streaming(
767                position_index,
768                &self.term_interner,
769                &mut *pos_writer,
770            )?;
771            pos_writer.finish()?;
772            offsets
773        } else {
774            FxHashMap::default()
775        };
776
777        // Phase 2: 4-way parallel build — postings, store, dense vectors, sparse vectors
778        // These are fully independent: different source data, different output files.
779        let inverted_index = std::mem::take(&mut self.inverted_index);
780        let term_interner = std::mem::replace(&mut self.term_interner, Rodeo::new());
781        let store_path = self.store_path.clone();
782        let num_compression_threads = self.config.num_compression_threads;
783        let compression_level = self.config.compression_level;
784        let dense_vectors = std::mem::take(&mut self.dense_vectors);
785        let mut sparse_vectors = std::mem::take(&mut self.sparse_vectors);
786        let schema = &self.schema;
787
788        // Pre-create all streaming writers (async) before entering sync rayon scope
789        // Wrapped in OffsetWriter to track bytes written per phase.
790        let mut term_dict_writer =
791            super::OffsetWriter::new(dir.streaming_writer(&files.term_dict).await?);
792        let mut postings_writer =
793            super::OffsetWriter::new(dir.streaming_writer(&files.postings).await?);
794        let mut store_writer = super::OffsetWriter::new(dir.streaming_writer(&files.store).await?);
795        let mut vectors_writer = if !dense_vectors.is_empty() {
796            Some(super::OffsetWriter::new(
797                dir.streaming_writer(&files.vectors).await?,
798            ))
799        } else {
800            None
801        };
802        let mut sparse_writer = if !sparse_vectors.is_empty() {
803            Some(super::OffsetWriter::new(
804                dir.streaming_writer(&files.sparse).await?,
805            ))
806        } else {
807            None
808        };
809        let mut fast_fields = std::mem::take(&mut self.fast_fields);
810        let num_docs = self.next_doc_id;
811        let mut fast_writer = if !fast_fields.is_empty() {
812            Some(super::OffsetWriter::new(
813                dir.streaming_writer(&files.fast).await?,
814            ))
815        } else {
816            None
817        };
818
819        let ((postings_result, store_result), ((vectors_result, sparse_result), fast_result)) =
820            rayon::join(
821                || {
822                    rayon::join(
823                        || {
824                            postings::build_postings_streaming(
825                                inverted_index,
826                                term_interner,
827                                &position_offsets,
828                                &mut term_dict_writer,
829                                &mut postings_writer,
830                            )
831                        },
832                        || {
833                            store::build_store_streaming(
834                                &store_path,
835                                num_compression_threads,
836                                compression_level,
837                                &mut store_writer,
838                                num_docs,
839                            )
840                        },
841                    )
842                },
843                || {
844                    rayon::join(
845                        || {
846                            rayon::join(
847                                || -> Result<()> {
848                                    if let Some(ref mut w) = vectors_writer {
849                                        dense::build_vectors_streaming(
850                                            dense_vectors,
851                                            schema,
852                                            trained,
853                                            w,
854                                        )?;
855                                    }
856                                    Ok(())
857                                },
858                                || -> Result<()> {
859                                    if let Some(ref mut w) = sparse_writer {
860                                        sparse::build_sparse_streaming(
861                                            &mut sparse_vectors,
862                                            schema,
863                                            w,
864                                        )?;
865                                    }
866                                    Ok(())
867                                },
868                            )
869                        },
870                        || -> Result<()> {
871                            if let Some(ref mut w) = fast_writer {
872                                build_fast_fields_streaming(&mut fast_fields, num_docs, w)?;
873                            }
874                            Ok(())
875                        },
876                    )
877                },
878            );
879        postings_result?;
880        store_result?;
881        vectors_result?;
882        sparse_result?;
883        fast_result?;
884
885        let term_dict_bytes = term_dict_writer.offset() as usize;
886        let postings_bytes = postings_writer.offset() as usize;
887        let store_bytes = store_writer.offset() as usize;
888        let vectors_bytes = vectors_writer.as_ref().map_or(0, |w| w.offset() as usize);
889        let sparse_bytes = sparse_writer.as_ref().map_or(0, |w| w.offset() as usize);
890        let fast_bytes = fast_writer.as_ref().map_or(0, |w| w.offset() as usize);
891
892        term_dict_writer.finish()?;
893        postings_writer.finish()?;
894        store_writer.finish()?;
895        if let Some(w) = vectors_writer {
896            w.finish()?;
897        }
898        if let Some(w) = sparse_writer {
899            w.finish()?;
900        }
901        if let Some(w) = fast_writer {
902            w.finish()?;
903        }
904        drop(position_offsets);
905        drop(sparse_vectors);
906
907        log::info!(
908            "[segment_build] {} docs: term_dict={}, postings={}, store={}, vectors={}, sparse={}, fast={}",
909            num_docs,
910            super::format_bytes(term_dict_bytes),
911            super::format_bytes(postings_bytes),
912            super::format_bytes(store_bytes),
913            super::format_bytes(vectors_bytes),
914            super::format_bytes(sparse_bytes),
915            super::format_bytes(fast_bytes),
916        );
917
918        let meta = SegmentMeta {
919            id: segment_id.0,
920            num_docs: self.next_doc_id,
921            field_stats: self.field_stats.clone(),
922        };
923
924        dir.write(&files.meta, &meta.serialize()?).await?;
925
926        // Cleanup temp files
927        let _ = std::fs::remove_file(&self.store_path);
928
929        Ok(meta)
930    }
931}
932
933/// Serialize all fast-field columns to a `.fast` file.
934fn build_fast_fields_streaming(
935    fast_fields: &mut FxHashMap<u32, crate::structures::fast_field::FastFieldWriter>,
936    num_docs: u32,
937    writer: &mut dyn Write,
938) -> Result<()> {
939    use crate::structures::fast_field::{FastFieldTocEntry, write_fast_field_toc_and_footer};
940
941    if fast_fields.is_empty() {
942        return Ok(());
943    }
944
945    // Sort fields by id for deterministic output
946    let mut field_ids: Vec<u32> = fast_fields.keys().copied().collect();
947    field_ids.sort_unstable();
948
949    let mut toc_entries: Vec<FastFieldTocEntry> = Vec::with_capacity(field_ids.len());
950    let mut current_offset = 0u64;
951
952    for &field_id in &field_ids {
953        let ff = fast_fields.get_mut(&field_id).unwrap();
954        ff.pad_to(num_docs);
955
956        let (mut toc, bytes_written) = ff.serialize(writer, current_offset)?;
957        toc.field_id = field_id;
958        current_offset += bytes_written;
959        toc_entries.push(toc);
960    }
961
962    // Write TOC + footer
963    let toc_offset = current_offset;
964    write_fast_field_toc_and_footer(writer, toc_offset, &toc_entries)?;
965
966    Ok(())
967}
968
969impl Drop for SegmentBuilder {
970    fn drop(&mut self) {
971        // Cleanup temp files on drop
972        let _ = std::fs::remove_file(&self.store_path);
973    }
974}