Skip to main content

hermes_core/segment/builder/
mod.rs

1//! Streaming segment builder with optimized memory usage
2//!
3//! Key optimizations:
4//! - **String interning**: Terms are interned using `lasso` to avoid repeated allocations
5//! - **hashbrown HashMap**: O(1) average insertion instead of BTreeMap's O(log n)
6//! - **Streaming document store**: Documents written to disk immediately
7//! - **Zero-copy store build**: Pre-serialized doc bytes passed directly to compressor
8//! - **Parallel posting serialization**: Rayon parallel sort + serialize
9//! - **Inline posting fast path**: Small terms skip PostingList/BlockPostingList entirely
10
11pub(crate) mod bmp;
12mod config;
13mod dense;
14#[cfg(feature = "diagnostics")]
15mod diagnostics;
16mod postings;
17mod sparse;
18mod store;
19
20pub use config::{MemoryBreakdown, SegmentBuilderConfig, SegmentBuilderStats};
21
22use std::fs::{File, OpenOptions};
23use std::io::{BufWriter, Write};
24use std::mem::size_of;
25use std::path::PathBuf;
26
27use hashbrown::HashMap;
28use lasso::{Rodeo, Spur};
29use rustc_hash::FxHashMap;
30
31use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
32use std::sync::Arc;
33
34use crate::directories::{Directory, DirectoryWriter};
35use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
36use crate::tokenizer::BoxedTokenizer;
37use crate::{DocId, Result};
38
39use dense::DenseVectorBuilder;
40use postings::{CompactPosting, PositionPostingListBuilder, PostingListBuilder, TermKey};
41use sparse::SparseVectorBuilder;
42
43/// Size of the document store buffer before writing to disk
44const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; // 16MB
45
46/// Memory overhead per new term in the inverted index:
47/// HashMap entry control byte + padding + TermKey + PostingListBuilder + Vec header
48const NEW_TERM_OVERHEAD: usize = size_of::<TermKey>() + size_of::<PostingListBuilder>() + 24;
49
50/// Memory overhead per newly interned string: Spur + arena pointers (2 × usize)
51const INTERN_OVERHEAD: usize = size_of::<Spur>() + 2 * size_of::<usize>();
52
53/// Memory overhead per new term in the position index
54const NEW_POS_TERM_OVERHEAD: usize =
55    size_of::<TermKey>() + size_of::<PositionPostingListBuilder>() + 24;
56
57/// Segment builder with optimized memory usage
58///
59/// Features:
60/// - Streams documents to disk immediately (no in-memory document storage)
61/// - Uses string interning for terms (reduced allocations)
62/// - Uses hashbrown HashMap (faster than BTreeMap)
63pub struct SegmentBuilder {
64    schema: Arc<Schema>,
65    config: SegmentBuilderConfig,
66    tokenizers: FxHashMap<Field, BoxedTokenizer>,
67
68    /// String interner for terms - O(1) lookup and deduplication
69    term_interner: Rodeo,
70
71    /// Inverted index: term key -> posting list
72    inverted_index: HashMap<TermKey, PostingListBuilder>,
73
74    /// Streaming document store writer
75    store_file: BufWriter<File>,
76    store_path: PathBuf,
77
78    /// Document count
79    next_doc_id: DocId,
80
81    /// Per-field statistics for BM25F
82    field_stats: FxHashMap<u32, FieldStats>,
83
84    /// Per-document field lengths stored compactly
85    /// Uses a flat Vec instead of Vec<HashMap> for better cache locality
86    /// Layout: [doc0_field0_len, doc0_field1_len, ..., doc1_field0_len, ...]
87    doc_field_lengths: Vec<u32>,
88    num_indexed_fields: usize,
89    field_to_slot: FxHashMap<u32, usize>,
90
91    /// Reusable buffer for per-document term frequency aggregation
92    /// Avoids allocating a new hashmap for each document
93    local_tf_buffer: FxHashMap<Spur, u32>,
94
95    /// Reusable buffer for per-document position tracking (when positions enabled)
96    /// Avoids allocating a new hashmap for each text field per document
97    local_positions: FxHashMap<Spur, Vec<u32>>,
98
99    /// Reusable buffer for tokenization to avoid per-token String allocations
100    token_buffer: String,
101
102    /// Reusable buffer for numeric field term encoding (avoids format!() alloc per call)
103    numeric_buffer: String,
104
105    /// Dense vector storage per field: field -> (doc_ids, vectors)
106    /// Vectors are stored as flat f32 arrays for efficient RaBitQ indexing
107    dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
108
109    /// Sparse vector storage per field: field -> SparseVectorBuilder
110    /// Uses proper BlockSparsePostingList with configurable quantization
111    sparse_vectors: FxHashMap<u32, SparseVectorBuilder>,
112
113    /// Position index for fields with positions enabled
114    /// term key -> position posting list
115    position_index: HashMap<TermKey, PositionPostingListBuilder>,
116
117    /// Fields that have position tracking enabled, with their mode
118    position_enabled_fields: FxHashMap<u32, Option<crate::dsl::PositionMode>>,
119
120    /// Current element ordinal for multi-valued fields (reset per document)
121    current_element_ordinal: FxHashMap<u32, u32>,
122
123    /// Incrementally tracked memory estimate (avoids expensive stats() calls)
124    estimated_memory: usize,
125
126    /// Reusable buffer for document serialization (avoids per-document allocation)
127    doc_serialize_buffer: Vec<u8>,
128
129    /// Fast-field columnar writers per field_id (only for fields with fast=true)
130    fast_fields: FxHashMap<u32, crate::structures::fast_field::FastFieldWriter>,
131}
132
133impl SegmentBuilder {
134    /// Create a new segment builder
135    pub fn new(schema: Arc<Schema>, config: SegmentBuilderConfig) -> Result<Self> {
136        let segment_id = uuid::Uuid::new_v4();
137        let store_path = config
138            .temp_dir
139            .join(format!("hermes_store_{}.tmp", segment_id));
140
141        let store_file = BufWriter::with_capacity(
142            STORE_BUFFER_SIZE,
143            OpenOptions::new()
144                .create(true)
145                .write(true)
146                .truncate(true)
147                .open(&store_path)?,
148        );
149
150        // Count indexed fields, track positions, and auto-configure tokenizers
151        let registry = crate::tokenizer::TokenizerRegistry::new();
152        let mut num_indexed_fields = 0;
153        let mut field_to_slot = FxHashMap::default();
154        let mut position_enabled_fields = FxHashMap::default();
155        let mut tokenizers = FxHashMap::default();
156        for (field, entry) in schema.fields() {
157            if entry.indexed && matches!(entry.field_type, FieldType::Text) {
158                field_to_slot.insert(field.0, num_indexed_fields);
159                num_indexed_fields += 1;
160                if entry.positions.is_some() {
161                    position_enabled_fields.insert(field.0, entry.positions);
162                }
163                if let Some(ref tok_name) = entry.tokenizer
164                    && let Some(tokenizer) = registry.get(tok_name)
165                {
166                    tokenizers.insert(field, tokenizer);
167                }
168            }
169        }
170
171        // Initialize fast-field writers for fields with fast=true
172        use crate::structures::fast_field::{FastFieldColumnType, FastFieldWriter};
173        let mut fast_fields = FxHashMap::default();
174        for (field, entry) in schema.fields() {
175            if entry.fast {
176                let writer = if entry.multi {
177                    match entry.field_type {
178                        FieldType::U64 => {
179                            FastFieldWriter::new_numeric_multi(FastFieldColumnType::U64)
180                        }
181                        FieldType::I64 => {
182                            FastFieldWriter::new_numeric_multi(FastFieldColumnType::I64)
183                        }
184                        FieldType::F64 => {
185                            FastFieldWriter::new_numeric_multi(FastFieldColumnType::F64)
186                        }
187                        FieldType::Text => FastFieldWriter::new_text_multi(),
188                        _ => continue,
189                    }
190                } else {
191                    match entry.field_type {
192                        FieldType::U64 => FastFieldWriter::new_numeric(FastFieldColumnType::U64),
193                        FieldType::I64 => FastFieldWriter::new_numeric(FastFieldColumnType::I64),
194                        FieldType::F64 => FastFieldWriter::new_numeric(FastFieldColumnType::F64),
195                        FieldType::Text => FastFieldWriter::new_text(),
196                        _ => continue,
197                    }
198                };
199                fast_fields.insert(field.0, writer);
200            }
201        }
202
203        Ok(Self {
204            schema,
205            tokenizers,
206            term_interner: Rodeo::new(),
207            inverted_index: HashMap::with_capacity(config.posting_map_capacity),
208            store_file,
209            store_path,
210            next_doc_id: 0,
211            field_stats: FxHashMap::default(),
212            doc_field_lengths: Vec::new(),
213            num_indexed_fields,
214            field_to_slot,
215            local_tf_buffer: FxHashMap::default(),
216            local_positions: FxHashMap::default(),
217            token_buffer: String::with_capacity(64),
218            numeric_buffer: String::with_capacity(32),
219            config,
220            dense_vectors: FxHashMap::default(),
221            sparse_vectors: FxHashMap::default(),
222            position_index: HashMap::new(),
223            position_enabled_fields,
224            current_element_ordinal: FxHashMap::default(),
225            estimated_memory: 0,
226            doc_serialize_buffer: Vec::with_capacity(256),
227            fast_fields,
228        })
229    }
230
231    pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
232        self.tokenizers.insert(field, tokenizer);
233    }
234
235    /// Get the current element ordinal for a field and increment it.
236    /// Used for multi-valued fields (text, dense_vector, sparse_vector).
237    fn next_element_ordinal(&mut self, field_id: u32) -> u32 {
238        let ordinal = *self.current_element_ordinal.get(&field_id).unwrap_or(&0);
239        *self.current_element_ordinal.entry(field_id).or_insert(0) += 1;
240        ordinal
241    }
242
243    pub fn num_docs(&self) -> u32 {
244        self.next_doc_id
245    }
246
247    /// Fast O(1) memory estimate - updated incrementally during indexing
248    #[inline]
249    pub fn estimated_memory_bytes(&self) -> usize {
250        self.estimated_memory
251    }
252
253    /// Count total unique sparse dimensions across all fields
254    pub fn sparse_dim_count(&self) -> usize {
255        self.sparse_vectors.values().map(|b| b.postings.len()).sum()
256    }
257
258    /// Get current statistics for debugging performance (expensive - iterates all data)
259    pub fn stats(&self) -> SegmentBuilderStats {
260        use std::mem::size_of;
261
262        let postings_in_memory: usize =
263            self.inverted_index.values().map(|p| p.postings.len()).sum();
264
265        // Size constants computed from actual types
266        let compact_posting_size = size_of::<CompactPosting>();
267        let vec_overhead = size_of::<Vec<u8>>(); // Vec header: ptr + len + cap = 24 bytes on 64-bit
268        let term_key_size = size_of::<TermKey>();
269        let posting_builder_size = size_of::<PostingListBuilder>();
270        let spur_size = size_of::<lasso::Spur>();
271        let sparse_entry_size = size_of::<(DocId, u16, f32)>();
272
273        // hashbrown HashMap entry overhead: key + value + 1 byte control + padding
274        // Measured: ~(key_size + value_size + 8) per entry on average
275        let hashmap_entry_base_overhead = 8usize;
276
277        // FxHashMap uses same layout as hashbrown
278        let fxhashmap_entry_overhead = hashmap_entry_base_overhead;
279
280        // Postings memory
281        let postings_bytes: usize = self
282            .inverted_index
283            .values()
284            .map(|p| p.postings.capacity() * compact_posting_size + vec_overhead)
285            .sum();
286
287        // Inverted index overhead
288        let index_overhead_bytes = self.inverted_index.len()
289            * (term_key_size + posting_builder_size + hashmap_entry_base_overhead);
290
291        // Term interner: Rodeo stores strings + metadata
292        // Rodeo internal: string bytes + Spur + arena overhead (~2 pointers per string)
293        let interner_arena_overhead = 2 * size_of::<usize>();
294        let avg_term_len = 8; // Estimated average term length
295        let interner_bytes =
296            self.term_interner.len() * (avg_term_len + spur_size + interner_arena_overhead);
297
298        // Doc field lengths
299        let field_lengths_bytes =
300            self.doc_field_lengths.capacity() * size_of::<u32>() + vec_overhead;
301
302        // Dense vectors
303        let mut dense_vectors_bytes: usize = 0;
304        let mut dense_vector_count: usize = 0;
305        let doc_id_ordinal_size = size_of::<(DocId, u16)>();
306        for b in self.dense_vectors.values() {
307            dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
308                + b.doc_ids.capacity() * doc_id_ordinal_size
309                + 2 * vec_overhead; // Two Vecs
310            dense_vector_count += b.doc_ids.len();
311        }
312
313        // Local buffers
314        let local_tf_entry_size = spur_size + size_of::<u32>() + fxhashmap_entry_overhead;
315        let local_tf_buffer_bytes = self.local_tf_buffer.capacity() * local_tf_entry_size;
316
317        // Sparse vectors
318        let mut sparse_vectors_bytes: usize = 0;
319        for builder in self.sparse_vectors.values() {
320            for postings in builder.postings.values() {
321                sparse_vectors_bytes += postings.capacity() * sparse_entry_size + vec_overhead;
322            }
323            // Inner FxHashMap overhead: u32 key + Vec value ptr + overhead
324            let inner_entry_size = size_of::<u32>() + vec_overhead + fxhashmap_entry_overhead;
325            sparse_vectors_bytes += builder.postings.len() * inner_entry_size;
326        }
327        // Outer FxHashMap overhead
328        let outer_sparse_entry_size =
329            size_of::<u32>() + size_of::<SparseVectorBuilder>() + fxhashmap_entry_overhead;
330        sparse_vectors_bytes += self.sparse_vectors.len() * outer_sparse_entry_size;
331
332        // Position index
333        let mut position_index_bytes: usize = 0;
334        for pos_builder in self.position_index.values() {
335            for (_, positions) in &pos_builder.postings {
336                position_index_bytes += positions.capacity() * size_of::<u32>() + vec_overhead;
337            }
338            // Vec<(DocId, Vec<u32>)> entry size
339            let pos_entry_size = size_of::<DocId>() + vec_overhead;
340            position_index_bytes += pos_builder.postings.capacity() * pos_entry_size;
341        }
342        // HashMap overhead for position_index
343        let pos_index_entry_size =
344            term_key_size + size_of::<PositionPostingListBuilder>() + hashmap_entry_base_overhead;
345        position_index_bytes += self.position_index.len() * pos_index_entry_size;
346
347        let estimated_memory_bytes = postings_bytes
348            + index_overhead_bytes
349            + interner_bytes
350            + field_lengths_bytes
351            + dense_vectors_bytes
352            + local_tf_buffer_bytes
353            + sparse_vectors_bytes
354            + position_index_bytes;
355
356        let memory_breakdown = MemoryBreakdown {
357            postings_bytes,
358            index_overhead_bytes,
359            interner_bytes,
360            field_lengths_bytes,
361            dense_vectors_bytes,
362            dense_vector_count,
363            sparse_vectors_bytes,
364            position_index_bytes,
365        };
366
367        SegmentBuilderStats {
368            num_docs: self.next_doc_id,
369            unique_terms: self.inverted_index.len(),
370            postings_in_memory,
371            interned_strings: self.term_interner.len(),
372            doc_field_lengths_size: self.doc_field_lengths.len(),
373            estimated_memory_bytes,
374            memory_breakdown,
375        }
376    }
377
378    /// Add a document - streams to disk immediately
379    pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
380        let doc_id = self.next_doc_id;
381        self.next_doc_id += 1;
382
383        // Initialize field lengths for this document
384        let base_idx = self.doc_field_lengths.len();
385        self.doc_field_lengths
386            .resize(base_idx + self.num_indexed_fields, 0);
387        self.estimated_memory += self.num_indexed_fields * std::mem::size_of::<u32>();
388
389        // Reset element ordinals for this document (for multi-valued fields)
390        self.current_element_ordinal.clear();
391
392        for (field, value) in doc.field_values() {
393            let Some(entry) = self.schema.get_field_entry(*field) else {
394                continue;
395            };
396
397            // Dense vectors are written to .vectors when indexed || stored
398            // Other field types require indexed or fast
399            if !matches!(&entry.field_type, FieldType::DenseVector) && !entry.indexed && !entry.fast
400            {
401                continue;
402            }
403
404            match (&entry.field_type, value) {
405                (FieldType::Text, FieldValue::Text(text)) => {
406                    if entry.indexed {
407                        let element_ordinal = self.next_element_ordinal(field.0);
408                        let token_count =
409                            self.index_text_field(*field, doc_id, text, element_ordinal)?;
410
411                        let stats = self.field_stats.entry(field.0).or_default();
412                        stats.total_tokens += token_count as u64;
413                        if element_ordinal == 0 {
414                            stats.doc_count += 1;
415                        }
416
417                        if let Some(&slot) = self.field_to_slot.get(&field.0) {
418                            self.doc_field_lengths[base_idx + slot] = token_count;
419                        }
420                    }
421
422                    // Fast-field: store raw text for text ordinal column
423                    if let Some(ff) = self.fast_fields.get_mut(&field.0) {
424                        ff.add_text(doc_id, text);
425                    }
426                }
427                (FieldType::U64, FieldValue::U64(v)) => {
428                    if entry.indexed {
429                        self.index_numeric_field(*field, doc_id, *v)?;
430                    }
431                    if let Some(ff) = self.fast_fields.get_mut(&field.0) {
432                        ff.add_u64(doc_id, *v);
433                    }
434                }
435                (FieldType::I64, FieldValue::I64(v)) => {
436                    if entry.indexed {
437                        self.index_numeric_field(*field, doc_id, *v as u64)?;
438                    }
439                    if let Some(ff) = self.fast_fields.get_mut(&field.0) {
440                        ff.add_i64(doc_id, *v);
441                    }
442                }
443                (FieldType::F64, FieldValue::F64(v)) => {
444                    if entry.indexed {
445                        self.index_numeric_field(*field, doc_id, v.to_bits())?;
446                    }
447                    if let Some(ff) = self.fast_fields.get_mut(&field.0) {
448                        ff.add_f64(doc_id, *v);
449                    }
450                }
451                (FieldType::DenseVector, FieldValue::DenseVector(vec))
452                    if entry.indexed || entry.stored =>
453                {
454                    let ordinal = self.next_element_ordinal(field.0);
455                    self.index_dense_vector_field(*field, doc_id, ordinal as u16, vec)?;
456                }
457                (FieldType::SparseVector, FieldValue::SparseVector(entries)) => {
458                    let ordinal = self.next_element_ordinal(field.0);
459                    self.index_sparse_vector_field(*field, doc_id, ordinal as u16, entries)?;
460                }
461                _ => {}
462            }
463        }
464
465        // Stream document to disk immediately
466        self.write_document_to_store(&doc)?;
467
468        Ok(doc_id)
469    }
470
471    /// Index a text field using interned terms
472    ///
473    /// Uses a custom tokenizer when set for the field (via `set_tokenizer`),
474    /// otherwise falls back to an inline zero-allocation path (split_whitespace
475    /// + lowercase + strip non-alphanumeric).
476    ///
477    /// If position recording is enabled for this field, also records token positions
478    /// encoded as (element_ordinal << 20) | token_position.
479    fn index_text_field(
480        &mut self,
481        field: Field,
482        doc_id: DocId,
483        text: &str,
484        element_ordinal: u32,
485    ) -> Result<u32> {
486        use crate::dsl::PositionMode;
487
488        let field_id = field.0;
489        let position_mode = self
490            .position_enabled_fields
491            .get(&field_id)
492            .copied()
493            .flatten();
494
495        // Phase 1: Aggregate term frequencies within this document
496        // Also collect positions if enabled
497        // Reuse buffers to avoid allocations
498        self.local_tf_buffer.clear();
499        // Clear position Vecs in-place (keeps allocated capacity for reuse)
500        for v in self.local_positions.values_mut() {
501            v.clear();
502        }
503
504        let mut token_position = 0u32;
505
506        // Tokenize: use custom tokenizer if set, else inline zero-alloc path.
507        // The owned Vec<Token> is computed first so the immutable borrow of
508        // self.tokenizers ends before we mutate other fields.
509        let custom_tokens = self.tokenizers.get(&field).map(|t| t.tokenize(text));
510
511        if let Some(tokens) = custom_tokens {
512            // Custom tokenizer path
513            for token in &tokens {
514                let term_spur = if let Some(spur) = self.term_interner.get(&token.text) {
515                    spur
516                } else {
517                    let spur = self.term_interner.get_or_intern(&token.text);
518                    self.estimated_memory += token.text.len() + INTERN_OVERHEAD;
519                    spur
520                };
521                *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
522
523                if let Some(mode) = position_mode {
524                    let encoded_pos = match mode {
525                        PositionMode::Ordinal => element_ordinal << 20,
526                        PositionMode::TokenPosition => token.position,
527                        PositionMode::Full => (element_ordinal << 20) | token.position,
528                    };
529                    self.local_positions
530                        .entry(term_spur)
531                        .or_default()
532                        .push(encoded_pos);
533                }
534            }
535            token_position = tokens.len() as u32;
536        } else {
537            // Inline zero-allocation path: split_whitespace + lowercase + strip non-alphanumeric
538            for word in text.split_whitespace() {
539                self.token_buffer.clear();
540                for c in word.chars() {
541                    if c.is_alphanumeric() {
542                        for lc in c.to_lowercase() {
543                            self.token_buffer.push(lc);
544                        }
545                    }
546                }
547
548                if self.token_buffer.is_empty() {
549                    continue;
550                }
551
552                let term_spur = if let Some(spur) = self.term_interner.get(&self.token_buffer) {
553                    spur
554                } else {
555                    let spur = self.term_interner.get_or_intern(&self.token_buffer);
556                    self.estimated_memory += self.token_buffer.len() + INTERN_OVERHEAD;
557                    spur
558                };
559                *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
560
561                if let Some(mode) = position_mode {
562                    let encoded_pos = match mode {
563                        PositionMode::Ordinal => element_ordinal << 20,
564                        PositionMode::TokenPosition => token_position,
565                        PositionMode::Full => (element_ordinal << 20) | token_position,
566                    };
567                    self.local_positions
568                        .entry(term_spur)
569                        .or_default()
570                        .push(encoded_pos);
571                }
572
573                token_position += 1;
574            }
575        }
576
577        // Phase 2: Insert aggregated terms into inverted index
578        // Now we only do one inverted_index lookup per unique term in doc
579        for (&term_spur, &tf) in &self.local_tf_buffer {
580            let term_key = TermKey {
581                field: field_id,
582                term: term_spur,
583            };
584
585            match self.inverted_index.entry(term_key) {
586                hashbrown::hash_map::Entry::Occupied(mut o) => {
587                    o.get_mut().add(doc_id, tf);
588                    self.estimated_memory += size_of::<CompactPosting>();
589                }
590                hashbrown::hash_map::Entry::Vacant(v) => {
591                    let mut posting = PostingListBuilder::new();
592                    posting.add(doc_id, tf);
593                    v.insert(posting);
594                    self.estimated_memory += size_of::<CompactPosting>() + NEW_TERM_OVERHEAD;
595                }
596            }
597
598            if position_mode.is_some()
599                && let Some(positions) = self.local_positions.get(&term_spur)
600            {
601                match self.position_index.entry(term_key) {
602                    hashbrown::hash_map::Entry::Occupied(mut o) => {
603                        for &pos in positions {
604                            o.get_mut().add_position(doc_id, pos);
605                        }
606                        self.estimated_memory += positions.len() * size_of::<u32>();
607                    }
608                    hashbrown::hash_map::Entry::Vacant(v) => {
609                        let mut pos_posting = PositionPostingListBuilder::new();
610                        for &pos in positions {
611                            pos_posting.add_position(doc_id, pos);
612                        }
613                        self.estimated_memory +=
614                            positions.len() * size_of::<u32>() + NEW_POS_TERM_OVERHEAD;
615                        v.insert(pos_posting);
616                    }
617                }
618            }
619        }
620
621        Ok(token_position)
622    }
623
624    fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
625        use std::fmt::Write;
626
627        self.numeric_buffer.clear();
628        write!(self.numeric_buffer, "__num_{}", value).unwrap();
629        let term_spur = if let Some(spur) = self.term_interner.get(&self.numeric_buffer) {
630            spur
631        } else {
632            let spur = self.term_interner.get_or_intern(&self.numeric_buffer);
633            self.estimated_memory += self.numeric_buffer.len() + INTERN_OVERHEAD;
634            spur
635        };
636
637        let term_key = TermKey {
638            field: field.0,
639            term: term_spur,
640        };
641
642        match self.inverted_index.entry(term_key) {
643            hashbrown::hash_map::Entry::Occupied(mut o) => {
644                o.get_mut().add(doc_id, 1);
645                self.estimated_memory += size_of::<CompactPosting>();
646            }
647            hashbrown::hash_map::Entry::Vacant(v) => {
648                let mut posting = PostingListBuilder::new();
649                posting.add(doc_id, 1);
650                v.insert(posting);
651                self.estimated_memory += size_of::<CompactPosting>() + NEW_TERM_OVERHEAD;
652            }
653        }
654
655        Ok(())
656    }
657
658    /// Index a dense vector field with ordinal tracking
659    fn index_dense_vector_field(
660        &mut self,
661        field: Field,
662        doc_id: DocId,
663        ordinal: u16,
664        vector: &[f32],
665    ) -> Result<()> {
666        let dim = vector.len();
667
668        let builder = self
669            .dense_vectors
670            .entry(field.0)
671            .or_insert_with(|| DenseVectorBuilder::new(dim));
672
673        // Verify dimension consistency
674        if builder.dim != dim && builder.len() > 0 {
675            return Err(crate::Error::Schema(format!(
676                "Dense vector dimension mismatch: expected {}, got {}",
677                builder.dim, dim
678            )));
679        }
680
681        builder.add(doc_id, ordinal, vector);
682
683        self.estimated_memory += std::mem::size_of_val(vector) + size_of::<(DocId, u16)>();
684
685        Ok(())
686    }
687
688    /// Index a sparse vector field using dedicated sparse posting lists
689    ///
690    /// Collects (doc_id, ordinal, weight) postings per dimension. During commit, these are
691    /// converted to BlockSparsePostingList with proper quantization from SparseVectorConfig.
692    ///
693    /// Weights below the configured `weight_threshold` are not indexed.
694    fn index_sparse_vector_field(
695        &mut self,
696        field: Field,
697        doc_id: DocId,
698        ordinal: u16,
699        entries: &[(u32, f32)],
700    ) -> Result<()> {
701        // Get weight threshold from field config (default 0.0 = no filtering)
702        let weight_threshold = self
703            .schema
704            .get_field_entry(field)
705            .and_then(|entry| entry.sparse_vector_config.as_ref())
706            .map(|config| config.weight_threshold)
707            .unwrap_or(0.0);
708
709        let builder = self
710            .sparse_vectors
711            .entry(field.0)
712            .or_insert_with(SparseVectorBuilder::new);
713
714        builder.inc_vector_count();
715
716        for &(dim_id, weight) in entries {
717            // Skip weights below threshold
718            if weight.abs() < weight_threshold {
719                continue;
720            }
721
722            let is_new_dim = !builder.postings.contains_key(&dim_id);
723            builder.add(dim_id, doc_id, ordinal, weight);
724            self.estimated_memory += size_of::<(DocId, u16, f32)>();
725            if is_new_dim {
726                // HashMap entry overhead + Vec header
727                self.estimated_memory += size_of::<u32>() + size_of::<Vec<(DocId, u16, f32)>>() + 8; // 8 = hashmap control byte + padding
728            }
729        }
730
731        Ok(())
732    }
733
734    /// Write document to streaming store (reuses internal buffer to avoid per-doc allocation)
735    fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
736        use byteorder::{LittleEndian, WriteBytesExt};
737
738        super::store::serialize_document_into(doc, &self.schema, &mut self.doc_serialize_buffer)?;
739
740        self.store_file
741            .write_u32::<LittleEndian>(self.doc_serialize_buffer.len() as u32)?;
742        self.store_file.write_all(&self.doc_serialize_buffer)?;
743
744        Ok(())
745    }
746
747    /// Build the final segment
748    ///
749    /// Streams all data directly to disk via StreamingWriter to avoid buffering
750    /// entire serialized outputs in memory. Each phase consumes and drops its
751    /// source data before the next phase begins.
752    pub async fn build<D: Directory + DirectoryWriter>(
753        mut self,
754        dir: &D,
755        segment_id: SegmentId,
756        trained: Option<&super::TrainedVectorStructures>,
757    ) -> Result<SegmentMeta> {
758        // Flush any buffered data
759        self.store_file.flush()?;
760
761        let files = SegmentFiles::new(segment_id.0);
762
763        // Phase 1: Stream positions directly to disk (consumes position_index)
764        let position_index = std::mem::take(&mut self.position_index);
765        let position_offsets = if !position_index.is_empty() {
766            let mut pos_writer = dir.streaming_writer(&files.positions).await?;
767            let offsets = postings::build_positions_streaming(
768                position_index,
769                &self.term_interner,
770                &mut *pos_writer,
771            )?;
772            pos_writer.finish()?;
773            offsets
774        } else {
775            FxHashMap::default()
776        };
777
778        // Phase 2: 4-way parallel build — postings, store, dense vectors, sparse vectors
779        // These are fully independent: different source data, different output files.
780        let inverted_index = std::mem::take(&mut self.inverted_index);
781        let term_interner = std::mem::replace(&mut self.term_interner, Rodeo::new());
782        let store_path = self.store_path.clone();
783        let num_compression_threads = self.config.num_compression_threads;
784        let compression_level = self.config.compression_level;
785        let dense_vectors = std::mem::take(&mut self.dense_vectors);
786        let mut sparse_vectors = std::mem::take(&mut self.sparse_vectors);
787        let schema = &self.schema;
788
789        // Pre-create all streaming writers (async) before entering sync rayon scope
790        // Wrapped in OffsetWriter to track bytes written per phase.
791        let mut term_dict_writer =
792            super::OffsetWriter::new(dir.streaming_writer(&files.term_dict).await?);
793        let mut postings_writer =
794            super::OffsetWriter::new(dir.streaming_writer(&files.postings).await?);
795        let mut store_writer = super::OffsetWriter::new(dir.streaming_writer(&files.store).await?);
796        let mut vectors_writer = if !dense_vectors.is_empty() {
797            Some(super::OffsetWriter::new(
798                dir.streaming_writer(&files.vectors).await?,
799            ))
800        } else {
801            None
802        };
803        let mut sparse_writer = if !sparse_vectors.is_empty() {
804            Some(super::OffsetWriter::new(
805                dir.streaming_writer(&files.sparse).await?,
806            ))
807        } else {
808            None
809        };
810        let mut fast_fields = std::mem::take(&mut self.fast_fields);
811        let num_docs = self.next_doc_id;
812        let mut fast_writer = if !fast_fields.is_empty() {
813            Some(super::OffsetWriter::new(
814                dir.streaming_writer(&files.fast).await?,
815            ))
816        } else {
817            None
818        };
819
820        let ((postings_result, store_result), ((vectors_result, sparse_result), fast_result)) =
821            rayon::join(
822                || {
823                    rayon::join(
824                        || {
825                            postings::build_postings_streaming(
826                                inverted_index,
827                                term_interner,
828                                &position_offsets,
829                                &mut term_dict_writer,
830                                &mut postings_writer,
831                            )
832                        },
833                        || {
834                            store::build_store_streaming(
835                                &store_path,
836                                num_compression_threads,
837                                compression_level,
838                                &mut store_writer,
839                                num_docs,
840                            )
841                        },
842                    )
843                },
844                || {
845                    rayon::join(
846                        || {
847                            rayon::join(
848                                || -> Result<()> {
849                                    if let Some(ref mut w) = vectors_writer {
850                                        dense::build_vectors_streaming(
851                                            dense_vectors,
852                                            schema,
853                                            trained,
854                                            w,
855                                        )?;
856                                    }
857                                    Ok(())
858                                },
859                                || -> Result<()> {
860                                    if let Some(ref mut w) = sparse_writer {
861                                        sparse::build_sparse_streaming(
862                                            &mut sparse_vectors,
863                                            schema,
864                                            w,
865                                        )?;
866                                    }
867                                    Ok(())
868                                },
869                            )
870                        },
871                        || -> Result<()> {
872                            if let Some(ref mut w) = fast_writer {
873                                build_fast_fields_streaming(&mut fast_fields, num_docs, w)?;
874                            }
875                            Ok(())
876                        },
877                    )
878                },
879            );
880        postings_result?;
881        store_result?;
882        vectors_result?;
883        sparse_result?;
884        fast_result?;
885
886        let term_dict_bytes = term_dict_writer.offset() as usize;
887        let postings_bytes = postings_writer.offset() as usize;
888        let store_bytes = store_writer.offset() as usize;
889        let vectors_bytes = vectors_writer.as_ref().map_or(0, |w| w.offset() as usize);
890        let sparse_bytes = sparse_writer.as_ref().map_or(0, |w| w.offset() as usize);
891        let fast_bytes = fast_writer.as_ref().map_or(0, |w| w.offset() as usize);
892
893        term_dict_writer.finish()?;
894        postings_writer.finish()?;
895        store_writer.finish()?;
896        if let Some(w) = vectors_writer {
897            w.finish()?;
898        }
899        if let Some(w) = sparse_writer {
900            w.finish()?;
901        }
902        if let Some(w) = fast_writer {
903            w.finish()?;
904        }
905        drop(position_offsets);
906        drop(sparse_vectors);
907
908        log::info!(
909            "[segment_build] {} docs: term_dict={}, postings={}, store={}, vectors={}, sparse={}, fast={}",
910            num_docs,
911            super::format_bytes(term_dict_bytes),
912            super::format_bytes(postings_bytes),
913            super::format_bytes(store_bytes),
914            super::format_bytes(vectors_bytes),
915            super::format_bytes(sparse_bytes),
916            super::format_bytes(fast_bytes),
917        );
918
919        let meta = SegmentMeta {
920            id: segment_id.0,
921            num_docs: self.next_doc_id,
922            field_stats: self.field_stats.clone(),
923        };
924
925        dir.write(&files.meta, &meta.serialize()?).await?;
926
927        // Cleanup temp files
928        let _ = std::fs::remove_file(&self.store_path);
929
930        Ok(meta)
931    }
932}
933
934/// Serialize all fast-field columns to a `.fast` file.
935fn build_fast_fields_streaming(
936    fast_fields: &mut FxHashMap<u32, crate::structures::fast_field::FastFieldWriter>,
937    num_docs: u32,
938    writer: &mut dyn Write,
939) -> Result<()> {
940    use crate::structures::fast_field::{FastFieldTocEntry, write_fast_field_toc_and_footer};
941
942    if fast_fields.is_empty() {
943        return Ok(());
944    }
945
946    // Sort fields by id for deterministic output
947    let mut field_ids: Vec<u32> = fast_fields.keys().copied().collect();
948    field_ids.sort_unstable();
949
950    let mut toc_entries: Vec<FastFieldTocEntry> = Vec::with_capacity(field_ids.len());
951    let mut current_offset = 0u64;
952
953    for &field_id in &field_ids {
954        let ff = fast_fields.get_mut(&field_id).unwrap();
955        ff.pad_to(num_docs);
956
957        let (mut toc, bytes_written) = ff.serialize(writer, current_offset)?;
958        toc.field_id = field_id;
959        current_offset += bytes_written;
960        toc_entries.push(toc);
961    }
962
963    // Write TOC + footer
964    let toc_offset = current_offset;
965    write_fast_field_toc_and_footer(writer, toc_offset, &toc_entries)?;
966
967    Ok(())
968}
969
970impl Drop for SegmentBuilder {
971    fn drop(&mut self) {
972        // Cleanup temp files on drop
973        let _ = std::fs::remove_file(&self.store_path);
974    }
975}