Skip to main content

hermes_core/segment/builder/
mod.rs

1//! Streaming segment builder with optimized memory usage
2//!
3//! Key optimizations:
4//! - **String interning**: Terms are interned using `lasso` to avoid repeated allocations
5//! - **hashbrown HashMap**: O(1) average insertion instead of BTreeMap's O(log n)
6//! - **Streaming document store**: Documents written to disk immediately
7//! - **Zero-copy store build**: Pre-serialized doc bytes passed directly to compressor
8//! - **Parallel posting serialization**: Rayon parallel sort + serialize
9//! - **Inline posting fast path**: Small terms skip PostingList/BlockPostingList entirely
10
11pub(crate) mod bmp;
12mod config;
13mod dense;
14#[cfg(feature = "diagnostics")]
15mod diagnostics;
16pub(crate) mod graph_bisection;
17mod postings;
18mod sparse;
19mod store;
20
21pub use config::{MemoryBreakdown, SegmentBuilderConfig, SegmentBuilderStats};
22
23use std::fs::{File, OpenOptions};
24use std::io::{BufWriter, Write};
25use std::mem::size_of;
26use std::path::PathBuf;
27
28use hashbrown::HashMap;
29use lasso::{Rodeo, Spur};
30use rustc_hash::FxHashMap;
31
32use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
33use std::sync::Arc;
34
35use crate::directories::{Directory, DirectoryWriter};
36use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
37use crate::tokenizer::BoxedTokenizer;
38use crate::{DocId, Result};
39
40use dense::{BinaryDenseVectorBuilder, DenseVectorBuilder};
41use postings::{CompactPosting, PositionPostingListBuilder, PostingListBuilder, TermKey};
42use sparse::SparseVectorBuilder;
43
44/// Size of the document store buffer before writing to disk
45const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; // 16MB
46
47/// Memory overhead per new term in the inverted index:
48/// HashMap entry control byte + padding + TermKey + PostingListBuilder + Vec header
49const NEW_TERM_OVERHEAD: usize = size_of::<TermKey>() + size_of::<PostingListBuilder>() + 24;
50
51/// Memory overhead per newly interned string: Spur + arena pointers (2 × usize)
52const INTERN_OVERHEAD: usize = size_of::<Spur>() + 2 * size_of::<usize>();
53
54/// Memory overhead per new term in the position index
55const NEW_POS_TERM_OVERHEAD: usize =
56    size_of::<TermKey>() + size_of::<PositionPostingListBuilder>() + 24;
57
58/// Segment builder with optimized memory usage
59///
60/// Features:
61/// - Streams documents to disk immediately (no in-memory document storage)
62/// - Uses string interning for terms (reduced allocations)
63/// - Uses hashbrown HashMap (faster than BTreeMap)
64pub struct SegmentBuilder {
65    schema: Arc<Schema>,
66    config: SegmentBuilderConfig,
67    tokenizers: FxHashMap<Field, BoxedTokenizer>,
68
69    /// String interner for terms - O(1) lookup and deduplication
70    term_interner: Rodeo,
71
72    /// Inverted index: term key -> posting list
73    inverted_index: HashMap<TermKey, PostingListBuilder>,
74
75    /// Streaming document store writer
76    store_file: BufWriter<File>,
77    store_path: PathBuf,
78
79    /// Document count
80    next_doc_id: DocId,
81
82    /// Per-field statistics for BM25F
83    field_stats: FxHashMap<u32, FieldStats>,
84
85    /// Per-document field lengths stored compactly
86    /// Uses a flat Vec instead of Vec<HashMap> for better cache locality
87    /// Layout: [doc0_field0_len, doc0_field1_len, ..., doc1_field0_len, ...]
88    doc_field_lengths: Vec<u32>,
89    num_indexed_fields: usize,
90    field_to_slot: FxHashMap<u32, usize>,
91
92    /// Reusable buffer for per-document term frequency aggregation
93    /// Avoids allocating a new hashmap for each document
94    local_tf_buffer: FxHashMap<Spur, u32>,
95
96    /// Reusable buffer for per-document position tracking (when positions enabled)
97    /// Avoids allocating a new hashmap for each text field per document
98    local_positions: FxHashMap<Spur, Vec<u32>>,
99
100    /// Reusable buffer for tokenization to avoid per-token String allocations
101    token_buffer: String,
102
103    /// Reusable buffer for numeric field term encoding (avoids format!() alloc per call)
104    numeric_buffer: String,
105
106    /// Dense vector storage per field: field -> (doc_ids, vectors)
107    /// Vectors are stored as flat f32 arrays for efficient RaBitQ indexing
108    dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
109
110    /// Binary dense vector storage per field: field -> packed-bit vectors
111    binary_dense_vectors: FxHashMap<u32, BinaryDenseVectorBuilder>,
112
113    /// Sparse vector storage per field: field -> SparseVectorBuilder
114    /// Uses proper BlockSparsePostingList with configurable quantization
115    sparse_vectors: FxHashMap<u32, SparseVectorBuilder>,
116
117    /// Position index for fields with positions enabled
118    /// term key -> position posting list
119    position_index: HashMap<TermKey, PositionPostingListBuilder>,
120
121    /// Fields that have position tracking enabled, with their mode
122    position_enabled_fields: FxHashMap<u32, Option<crate::dsl::PositionMode>>,
123
124    /// Current element ordinal for multi-valued fields (reset per document)
125    current_element_ordinal: FxHashMap<u32, u32>,
126
127    /// Incrementally tracked memory estimate (avoids expensive stats() calls)
128    estimated_memory: usize,
129
130    /// Reusable buffer for document serialization (avoids per-document allocation)
131    doc_serialize_buffer: Vec<u8>,
132
133    /// Fast-field columnar writers per field_id (only for fields with fast=true)
134    fast_fields: FxHashMap<u32, crate::structures::fast_field::FastFieldWriter>,
135}
136
137impl SegmentBuilder {
138    /// Create a new segment builder
139    pub fn new(schema: Arc<Schema>, config: SegmentBuilderConfig) -> Result<Self> {
140        let segment_id = uuid::Uuid::new_v4();
141        let store_path = config
142            .temp_dir
143            .join(format!("hermes_store_{}.tmp", segment_id));
144
145        let store_file = BufWriter::with_capacity(
146            STORE_BUFFER_SIZE,
147            OpenOptions::new()
148                .create(true)
149                .write(true)
150                .truncate(true)
151                .open(&store_path)?,
152        );
153
154        // Count indexed fields, track positions, and auto-configure tokenizers
155        let registry = crate::tokenizer::TokenizerRegistry::new();
156        let mut num_indexed_fields = 0;
157        let mut field_to_slot = FxHashMap::default();
158        let mut position_enabled_fields = FxHashMap::default();
159        let mut tokenizers = FxHashMap::default();
160        for (field, entry) in schema.fields() {
161            if entry.indexed && matches!(entry.field_type, FieldType::Text) {
162                field_to_slot.insert(field.0, num_indexed_fields);
163                num_indexed_fields += 1;
164                if entry.positions.is_some() {
165                    position_enabled_fields.insert(field.0, entry.positions);
166                }
167                if let Some(ref tok_name) = entry.tokenizer
168                    && let Some(tokenizer) = registry.get(tok_name)
169                {
170                    tokenizers.insert(field, tokenizer);
171                }
172            }
173        }
174
175        // Initialize fast-field writers for fields with fast=true
176        use crate::structures::fast_field::{FastFieldColumnType, FastFieldWriter};
177        let mut fast_fields = FxHashMap::default();
178        for (field, entry) in schema.fields() {
179            if entry.fast {
180                let writer = if entry.multi {
181                    match entry.field_type {
182                        FieldType::U64 => {
183                            FastFieldWriter::new_numeric_multi(FastFieldColumnType::U64)
184                        }
185                        FieldType::I64 => {
186                            FastFieldWriter::new_numeric_multi(FastFieldColumnType::I64)
187                        }
188                        FieldType::F64 => {
189                            FastFieldWriter::new_numeric_multi(FastFieldColumnType::F64)
190                        }
191                        FieldType::Text => FastFieldWriter::new_text_multi(),
192                        _ => continue,
193                    }
194                } else {
195                    match entry.field_type {
196                        FieldType::U64 => FastFieldWriter::new_numeric(FastFieldColumnType::U64),
197                        FieldType::I64 => FastFieldWriter::new_numeric(FastFieldColumnType::I64),
198                        FieldType::F64 => FastFieldWriter::new_numeric(FastFieldColumnType::F64),
199                        FieldType::Text => FastFieldWriter::new_text(),
200                        _ => continue,
201                    }
202                };
203                fast_fields.insert(field.0, writer);
204            }
205        }
206
207        Ok(Self {
208            schema,
209            tokenizers,
210            term_interner: Rodeo::new(),
211            inverted_index: HashMap::with_capacity(config.posting_map_capacity),
212            store_file,
213            store_path,
214            next_doc_id: 0,
215            field_stats: FxHashMap::default(),
216            doc_field_lengths: Vec::new(),
217            num_indexed_fields,
218            field_to_slot,
219            local_tf_buffer: FxHashMap::default(),
220            local_positions: FxHashMap::default(),
221            token_buffer: String::with_capacity(64),
222            numeric_buffer: String::with_capacity(32),
223            config,
224            dense_vectors: FxHashMap::default(),
225            binary_dense_vectors: FxHashMap::default(),
226            sparse_vectors: FxHashMap::default(),
227            position_index: HashMap::new(),
228            position_enabled_fields,
229            current_element_ordinal: FxHashMap::default(),
230            estimated_memory: 0,
231            doc_serialize_buffer: Vec::with_capacity(256),
232            fast_fields,
233        })
234    }
235
236    pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
237        self.tokenizers.insert(field, tokenizer);
238    }
239
240    /// Get the current element ordinal for a field and increment it.
241    /// Used for multi-valued fields (text, dense_vector, sparse_vector).
242    fn next_element_ordinal(&mut self, field_id: u32) -> u32 {
243        let ordinal = *self.current_element_ordinal.get(&field_id).unwrap_or(&0);
244        *self.current_element_ordinal.entry(field_id).or_insert(0) += 1;
245        ordinal
246    }
247
248    pub fn num_docs(&self) -> u32 {
249        self.next_doc_id
250    }
251
252    /// Fast O(1) memory estimate - updated incrementally during indexing
253    #[inline]
254    pub fn estimated_memory_bytes(&self) -> usize {
255        self.estimated_memory
256    }
257
258    /// Count total unique sparse dimensions across all fields
259    pub fn sparse_dim_count(&self) -> usize {
260        self.sparse_vectors.values().map(|b| b.postings.len()).sum()
261    }
262
263    /// Get current statistics for debugging performance (expensive - iterates all data)
264    pub fn stats(&self) -> SegmentBuilderStats {
265        use std::mem::size_of;
266
267        let postings_in_memory: usize =
268            self.inverted_index.values().map(|p| p.postings.len()).sum();
269
270        // Size constants computed from actual types
271        let compact_posting_size = size_of::<CompactPosting>();
272        let vec_overhead = size_of::<Vec<u8>>(); // Vec header: ptr + len + cap = 24 bytes on 64-bit
273        let term_key_size = size_of::<TermKey>();
274        let posting_builder_size = size_of::<PostingListBuilder>();
275        let spur_size = size_of::<lasso::Spur>();
276        let sparse_entry_size = size_of::<(DocId, u16, f32)>();
277
278        // hashbrown HashMap entry overhead: key + value + 1 byte control + padding
279        // Measured: ~(key_size + value_size + 8) per entry on average
280        let hashmap_entry_base_overhead = 8usize;
281
282        // FxHashMap uses same layout as hashbrown
283        let fxhashmap_entry_overhead = hashmap_entry_base_overhead;
284
285        // Postings memory
286        let postings_bytes: usize = self
287            .inverted_index
288            .values()
289            .map(|p| p.postings.capacity() * compact_posting_size + vec_overhead)
290            .sum();
291
292        // Inverted index overhead
293        let index_overhead_bytes = self.inverted_index.len()
294            * (term_key_size + posting_builder_size + hashmap_entry_base_overhead);
295
296        // Term interner: Rodeo stores strings + metadata
297        // Rodeo internal: string bytes + Spur + arena overhead (~2 pointers per string)
298        let interner_arena_overhead = 2 * size_of::<usize>();
299        let avg_term_len = 8; // Estimated average term length
300        let interner_bytes =
301            self.term_interner.len() * (avg_term_len + spur_size + interner_arena_overhead);
302
303        // Doc field lengths
304        let field_lengths_bytes =
305            self.doc_field_lengths.capacity() * size_of::<u32>() + vec_overhead;
306
307        // Dense vectors
308        let mut dense_vectors_bytes: usize = 0;
309        let mut dense_vector_count: usize = 0;
310        let doc_id_ordinal_size = size_of::<(DocId, u16)>();
311        for b in self.dense_vectors.values() {
312            dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
313                + b.doc_ids.capacity() * doc_id_ordinal_size
314                + 2 * vec_overhead; // Two Vecs
315            dense_vector_count += b.doc_ids.len();
316        }
317        // Binary dense vectors
318        for b in self.binary_dense_vectors.values() {
319            dense_vectors_bytes += b.vectors.capacity()
320                + b.doc_ids.capacity() * doc_id_ordinal_size
321                + 2 * vec_overhead;
322            dense_vector_count += b.doc_ids.len();
323        }
324
325        // Local buffers
326        let local_tf_entry_size = spur_size + size_of::<u32>() + fxhashmap_entry_overhead;
327        let local_tf_buffer_bytes = self.local_tf_buffer.capacity() * local_tf_entry_size;
328
329        // Sparse vectors
330        let mut sparse_vectors_bytes: usize = 0;
331        for builder in self.sparse_vectors.values() {
332            for postings in builder.postings.values() {
333                sparse_vectors_bytes += postings.capacity() * sparse_entry_size + vec_overhead;
334            }
335            // Inner FxHashMap overhead: u32 key + Vec value ptr + overhead
336            let inner_entry_size = size_of::<u32>() + vec_overhead + fxhashmap_entry_overhead;
337            sparse_vectors_bytes += builder.postings.len() * inner_entry_size;
338        }
339        // Outer FxHashMap overhead
340        let outer_sparse_entry_size =
341            size_of::<u32>() + size_of::<SparseVectorBuilder>() + fxhashmap_entry_overhead;
342        sparse_vectors_bytes += self.sparse_vectors.len() * outer_sparse_entry_size;
343
344        // Position index
345        let mut position_index_bytes: usize = 0;
346        for pos_builder in self.position_index.values() {
347            for (_, positions) in &pos_builder.postings {
348                position_index_bytes += positions.capacity() * size_of::<u32>() + vec_overhead;
349            }
350            // Vec<(DocId, Vec<u32>)> entry size
351            let pos_entry_size = size_of::<DocId>() + vec_overhead;
352            position_index_bytes += pos_builder.postings.capacity() * pos_entry_size;
353        }
354        // HashMap overhead for position_index
355        let pos_index_entry_size =
356            term_key_size + size_of::<PositionPostingListBuilder>() + hashmap_entry_base_overhead;
357        position_index_bytes += self.position_index.len() * pos_index_entry_size;
358
359        let estimated_memory_bytes = postings_bytes
360            + index_overhead_bytes
361            + interner_bytes
362            + field_lengths_bytes
363            + dense_vectors_bytes
364            + local_tf_buffer_bytes
365            + sparse_vectors_bytes
366            + position_index_bytes;
367
368        let memory_breakdown = MemoryBreakdown {
369            postings_bytes,
370            index_overhead_bytes,
371            interner_bytes,
372            field_lengths_bytes,
373            dense_vectors_bytes,
374            dense_vector_count,
375            sparse_vectors_bytes,
376            position_index_bytes,
377        };
378
379        SegmentBuilderStats {
380            num_docs: self.next_doc_id,
381            unique_terms: self.inverted_index.len(),
382            postings_in_memory,
383            interned_strings: self.term_interner.len(),
384            doc_field_lengths_size: self.doc_field_lengths.len(),
385            estimated_memory_bytes,
386            memory_breakdown,
387        }
388    }
389
390    /// Add a document - streams to disk immediately
391    pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
392        let doc_id = self.next_doc_id;
393        self.next_doc_id += 1;
394
395        // Initialize field lengths for this document
396        let base_idx = self.doc_field_lengths.len();
397        self.doc_field_lengths
398            .resize(base_idx + self.num_indexed_fields, 0);
399        self.estimated_memory += self.num_indexed_fields * std::mem::size_of::<u32>();
400
401        // Reset element ordinals for this document (for multi-valued fields)
402        self.current_element_ordinal.clear();
403
404        for (field, value) in doc.field_values() {
405            let Some(entry) = self.schema.get_field_entry(*field) else {
406                continue;
407            };
408
409            // Dense/binary vectors are written to .vectors when indexed || stored
410            // Other field types require indexed or fast
411            if !matches!(
412                &entry.field_type,
413                FieldType::DenseVector | FieldType::BinaryDenseVector
414            ) && !entry.indexed
415                && !entry.fast
416            {
417                continue;
418            }
419
420            match (&entry.field_type, value) {
421                (FieldType::Text, FieldValue::Text(text)) => {
422                    if entry.indexed {
423                        let element_ordinal = self.next_element_ordinal(field.0);
424                        let token_count =
425                            self.index_text_field(*field, doc_id, text, element_ordinal)?;
426
427                        let stats = self.field_stats.entry(field.0).or_default();
428                        stats.total_tokens += token_count as u64;
429                        if element_ordinal == 0 {
430                            stats.doc_count += 1;
431                        }
432
433                        if let Some(&slot) = self.field_to_slot.get(&field.0) {
434                            self.doc_field_lengths[base_idx + slot] = token_count;
435                        }
436                    }
437
438                    // Fast-field: store raw text for text ordinal column
439                    if let Some(ff) = self.fast_fields.get_mut(&field.0) {
440                        ff.add_text(doc_id, text);
441                    }
442                }
443                (FieldType::U64, FieldValue::U64(v)) => {
444                    if entry.indexed {
445                        self.index_numeric_field(*field, doc_id, *v)?;
446                    }
447                    if let Some(ff) = self.fast_fields.get_mut(&field.0) {
448                        ff.add_u64(doc_id, *v);
449                    }
450                }
451                (FieldType::I64, FieldValue::I64(v)) => {
452                    if entry.indexed {
453                        self.index_numeric_field(*field, doc_id, *v as u64)?;
454                    }
455                    if let Some(ff) = self.fast_fields.get_mut(&field.0) {
456                        ff.add_i64(doc_id, *v);
457                    }
458                }
459                (FieldType::F64, FieldValue::F64(v)) => {
460                    if entry.indexed {
461                        self.index_numeric_field(*field, doc_id, v.to_bits())?;
462                    }
463                    if let Some(ff) = self.fast_fields.get_mut(&field.0) {
464                        ff.add_f64(doc_id, *v);
465                    }
466                }
467                (FieldType::DenseVector, FieldValue::DenseVector(vec))
468                    if entry.indexed || entry.stored =>
469                {
470                    let ordinal = self.next_element_ordinal(field.0);
471                    self.index_dense_vector_field(*field, doc_id, ordinal as u16, vec)?;
472                }
473                (FieldType::BinaryDenseVector, FieldValue::BinaryDenseVector(bytes))
474                    if entry.indexed || entry.stored =>
475                {
476                    let ordinal = self.next_element_ordinal(field.0);
477                    self.index_binary_dense_vector_field(*field, doc_id, ordinal as u16, bytes)?;
478                }
479                (FieldType::SparseVector, FieldValue::SparseVector(entries)) => {
480                    let ordinal = self.next_element_ordinal(field.0);
481                    self.index_sparse_vector_field(*field, doc_id, ordinal as u16, entries)?;
482                }
483                _ => {}
484            }
485        }
486
487        // Stream document to disk immediately
488        self.write_document_to_store(&doc)?;
489
490        Ok(doc_id)
491    }
492
493    /// Index a text field using interned terms
494    ///
495    /// Uses a custom tokenizer when set for the field (via `set_tokenizer`),
496    /// otherwise falls back to an inline zero-allocation path (split_whitespace
497    /// + lowercase + strip non-alphanumeric).
498    ///
499    /// If position recording is enabled for this field, also records token positions
500    /// encoded as (element_ordinal << 20) | token_position.
501    fn index_text_field(
502        &mut self,
503        field: Field,
504        doc_id: DocId,
505        text: &str,
506        element_ordinal: u32,
507    ) -> Result<u32> {
508        use crate::dsl::PositionMode;
509
510        let field_id = field.0;
511        let position_mode = self
512            .position_enabled_fields
513            .get(&field_id)
514            .copied()
515            .flatten();
516
517        // Phase 1: Aggregate term frequencies within this document
518        // Also collect positions if enabled
519        // Reuse buffers to avoid allocations
520        self.local_tf_buffer.clear();
521        // Clear position Vecs in-place (keeps allocated capacity for reuse)
522        for v in self.local_positions.values_mut() {
523            v.clear();
524        }
525
526        let mut token_position = 0u32;
527
528        // Tokenize: use custom tokenizer if set, else inline zero-alloc path.
529        // The owned Vec<Token> is computed first so the immutable borrow of
530        // self.tokenizers ends before we mutate other fields.
531        let custom_tokens = self.tokenizers.get(&field).map(|t| t.tokenize(text));
532
533        if let Some(tokens) = custom_tokens {
534            // Custom tokenizer path
535            for token in &tokens {
536                let term_spur = if let Some(spur) = self.term_interner.get(&token.text) {
537                    spur
538                } else {
539                    let spur = self.term_interner.get_or_intern(&token.text);
540                    self.estimated_memory += token.text.len() + INTERN_OVERHEAD;
541                    spur
542                };
543                *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
544
545                if let Some(mode) = position_mode {
546                    let encoded_pos = match mode {
547                        PositionMode::Ordinal => element_ordinal << 20,
548                        PositionMode::TokenPosition => token.position,
549                        PositionMode::Full => (element_ordinal << 20) | token.position,
550                    };
551                    self.local_positions
552                        .entry(term_spur)
553                        .or_default()
554                        .push(encoded_pos);
555                }
556            }
557            token_position = tokens.len() as u32;
558        } else {
559            // Inline zero-allocation path: split_whitespace + lowercase + strip non-alphanumeric
560            for word in text.split_whitespace() {
561                self.token_buffer.clear();
562                for c in word.chars() {
563                    if c.is_alphanumeric() {
564                        for lc in c.to_lowercase() {
565                            self.token_buffer.push(lc);
566                        }
567                    }
568                }
569
570                if self.token_buffer.is_empty() {
571                    continue;
572                }
573
574                let term_spur = if let Some(spur) = self.term_interner.get(&self.token_buffer) {
575                    spur
576                } else {
577                    let spur = self.term_interner.get_or_intern(&self.token_buffer);
578                    self.estimated_memory += self.token_buffer.len() + INTERN_OVERHEAD;
579                    spur
580                };
581                *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
582
583                if let Some(mode) = position_mode {
584                    let encoded_pos = match mode {
585                        PositionMode::Ordinal => element_ordinal << 20,
586                        PositionMode::TokenPosition => token_position,
587                        PositionMode::Full => (element_ordinal << 20) | token_position,
588                    };
589                    self.local_positions
590                        .entry(term_spur)
591                        .or_default()
592                        .push(encoded_pos);
593                }
594
595                token_position += 1;
596            }
597        }
598
599        // Phase 2: Insert aggregated terms into inverted index
600        // Now we only do one inverted_index lookup per unique term in doc
601        for (&term_spur, &tf) in &self.local_tf_buffer {
602            let term_key = TermKey {
603                field: field_id,
604                term: term_spur,
605            };
606
607            match self.inverted_index.entry(term_key) {
608                hashbrown::hash_map::Entry::Occupied(mut o) => {
609                    o.get_mut().add(doc_id, tf);
610                    self.estimated_memory += size_of::<CompactPosting>();
611                }
612                hashbrown::hash_map::Entry::Vacant(v) => {
613                    let mut posting = PostingListBuilder::new();
614                    posting.add(doc_id, tf);
615                    v.insert(posting);
616                    self.estimated_memory += size_of::<CompactPosting>() + NEW_TERM_OVERHEAD;
617                }
618            }
619
620            if position_mode.is_some()
621                && let Some(positions) = self.local_positions.get(&term_spur)
622            {
623                match self.position_index.entry(term_key) {
624                    hashbrown::hash_map::Entry::Occupied(mut o) => {
625                        for &pos in positions {
626                            o.get_mut().add_position(doc_id, pos);
627                        }
628                        self.estimated_memory += positions.len() * size_of::<u32>();
629                    }
630                    hashbrown::hash_map::Entry::Vacant(v) => {
631                        let mut pos_posting = PositionPostingListBuilder::new();
632                        for &pos in positions {
633                            pos_posting.add_position(doc_id, pos);
634                        }
635                        self.estimated_memory +=
636                            positions.len() * size_of::<u32>() + NEW_POS_TERM_OVERHEAD;
637                        v.insert(pos_posting);
638                    }
639                }
640            }
641        }
642
643        Ok(token_position)
644    }
645
646    fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
647        use std::fmt::Write;
648
649        self.numeric_buffer.clear();
650        write!(self.numeric_buffer, "__num_{}", value).unwrap();
651        let term_spur = if let Some(spur) = self.term_interner.get(&self.numeric_buffer) {
652            spur
653        } else {
654            let spur = self.term_interner.get_or_intern(&self.numeric_buffer);
655            self.estimated_memory += self.numeric_buffer.len() + INTERN_OVERHEAD;
656            spur
657        };
658
659        let term_key = TermKey {
660            field: field.0,
661            term: term_spur,
662        };
663
664        match self.inverted_index.entry(term_key) {
665            hashbrown::hash_map::Entry::Occupied(mut o) => {
666                o.get_mut().add(doc_id, 1);
667                self.estimated_memory += size_of::<CompactPosting>();
668            }
669            hashbrown::hash_map::Entry::Vacant(v) => {
670                let mut posting = PostingListBuilder::new();
671                posting.add(doc_id, 1);
672                v.insert(posting);
673                self.estimated_memory += size_of::<CompactPosting>() + NEW_TERM_OVERHEAD;
674            }
675        }
676
677        Ok(())
678    }
679
680    /// Index a dense vector field with ordinal tracking
681    fn index_dense_vector_field(
682        &mut self,
683        field: Field,
684        doc_id: DocId,
685        ordinal: u16,
686        vector: &[f32],
687    ) -> Result<()> {
688        let dim = vector.len();
689
690        let builder = self
691            .dense_vectors
692            .entry(field.0)
693            .or_insert_with(|| DenseVectorBuilder::new(dim));
694
695        // Verify dimension consistency
696        if builder.dim != dim && builder.len() > 0 {
697            return Err(crate::Error::Schema(format!(
698                "Dense vector dimension mismatch: expected {}, got {}",
699                builder.dim, dim
700            )));
701        }
702
703        builder.add(doc_id, ordinal, vector);
704
705        self.estimated_memory += std::mem::size_of_val(vector) + size_of::<(DocId, u16)>();
706
707        Ok(())
708    }
709
710    /// Index a binary dense vector field with ordinal tracking
711    fn index_binary_dense_vector_field(
712        &mut self,
713        field: Field,
714        doc_id: DocId,
715        ordinal: u16,
716        bytes: &[u8],
717    ) -> Result<()> {
718        let dim_bits = self
719            .schema
720            .get_field_entry(field)
721            .and_then(|e| e.binary_dense_vector_config.as_ref())
722            .map(|c| c.dim)
723            .ok_or_else(|| {
724                crate::Error::Schema("BinaryDenseVector field missing config".to_string())
725            })?;
726
727        let expected_byte_len = dim_bits.div_ceil(8);
728        if bytes.len() != expected_byte_len {
729            return Err(crate::Error::Schema(format!(
730                "Binary vector byte length mismatch: expected {} (dim={}), got {}",
731                expected_byte_len,
732                dim_bits,
733                bytes.len()
734            )));
735        }
736
737        let builder = self
738            .binary_dense_vectors
739            .entry(field.0)
740            .or_insert_with(|| BinaryDenseVectorBuilder::new(dim_bits));
741
742        builder.add(doc_id, ordinal, bytes);
743        self.estimated_memory += bytes.len() + size_of::<(DocId, u16)>();
744
745        Ok(())
746    }
747
748    /// Index a sparse vector field using dedicated sparse posting lists
749    ///
750    /// Collects (doc_id, ordinal, weight) postings per dimension. During commit, these are
751    /// converted to BlockSparsePostingList with proper quantization from SparseVectorConfig.
752    ///
753    /// Weights below the configured `weight_threshold` are not indexed.
754    fn index_sparse_vector_field(
755        &mut self,
756        field: Field,
757        doc_id: DocId,
758        ordinal: u16,
759        entries: &[(u32, f32)],
760    ) -> Result<()> {
761        // Get weight threshold from field config (default 0.0 = no filtering)
762        let weight_threshold = self
763            .schema
764            .get_field_entry(field)
765            .and_then(|entry| entry.sparse_vector_config.as_ref())
766            .map(|config| config.weight_threshold)
767            .unwrap_or(0.0);
768
769        let builder = self
770            .sparse_vectors
771            .entry(field.0)
772            .or_insert_with(SparseVectorBuilder::new);
773
774        builder.inc_vector_count();
775
776        for &(dim_id, weight) in entries {
777            // Skip weights below threshold
778            if weight.abs() < weight_threshold {
779                continue;
780            }
781
782            let is_new_dim = !builder.postings.contains_key(&dim_id);
783            builder.add(dim_id, doc_id, ordinal, weight);
784            self.estimated_memory += size_of::<(DocId, u16, f32)>();
785            if is_new_dim {
786                // HashMap entry overhead + Vec header
787                self.estimated_memory += size_of::<u32>() + size_of::<Vec<(DocId, u16, f32)>>() + 8; // 8 = hashmap control byte + padding
788            }
789        }
790
791        Ok(())
792    }
793
794    /// Write document to streaming store (reuses internal buffer to avoid per-doc allocation)
795    fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
796        use byteorder::{LittleEndian, WriteBytesExt};
797
798        super::store::serialize_document_into(doc, &self.schema, &mut self.doc_serialize_buffer)?;
799
800        self.store_file
801            .write_u32::<LittleEndian>(self.doc_serialize_buffer.len() as u32)?;
802        self.store_file.write_all(&self.doc_serialize_buffer)?;
803
804        Ok(())
805    }
806
807    /// Build the final segment
808    ///
809    /// Streams all data directly to disk via StreamingWriter to avoid buffering
810    /// entire serialized outputs in memory. Each phase consumes and drops its
811    /// source data before the next phase begins.
812    pub async fn build<D: Directory + DirectoryWriter>(
813        mut self,
814        dir: &D,
815        segment_id: SegmentId,
816        trained: Option<&super::TrainedVectorStructures>,
817    ) -> Result<SegmentMeta> {
818        // Flush any buffered data
819        self.store_file.flush()?;
820
821        let files = SegmentFiles::new(segment_id.0);
822
823        // Phase 1: Stream positions directly to disk (consumes position_index)
824        let position_index = std::mem::take(&mut self.position_index);
825        let position_offsets = if !position_index.is_empty() {
826            let mut pos_writer = dir.streaming_writer(&files.positions).await?;
827            let offsets = postings::build_positions_streaming(
828                position_index,
829                &self.term_interner,
830                &mut *pos_writer,
831            )?;
832            pos_writer.finish()?;
833            offsets
834        } else {
835            FxHashMap::default()
836        };
837
838        // Phase 2: 4-way parallel build — postings, store, dense vectors, sparse vectors
839        // These are fully independent: different source data, different output files.
840        let inverted_index = std::mem::take(&mut self.inverted_index);
841        let term_interner = std::mem::replace(&mut self.term_interner, Rodeo::new());
842        let store_path = self.store_path.clone();
843        let num_compression_threads = self.config.num_compression_threads;
844        let compression_level = self.config.compression_level;
845        let dense_vectors = std::mem::take(&mut self.dense_vectors);
846        let binary_dense_vectors = std::mem::take(&mut self.binary_dense_vectors);
847        let mut sparse_vectors = std::mem::take(&mut self.sparse_vectors);
848        let schema = &self.schema;
849
850        // Pre-create all streaming writers (async) before entering sync rayon scope
851        // Wrapped in OffsetWriter to track bytes written per phase.
852        let mut term_dict_writer =
853            super::OffsetWriter::new(dir.streaming_writer(&files.term_dict).await?);
854        let mut postings_writer =
855            super::OffsetWriter::new(dir.streaming_writer(&files.postings).await?);
856        let mut store_writer = super::OffsetWriter::new(dir.streaming_writer(&files.store).await?);
857        let mut vectors_writer = if !dense_vectors.is_empty() || !binary_dense_vectors.is_empty() {
858            Some(super::OffsetWriter::new(
859                dir.streaming_writer(&files.vectors).await?,
860            ))
861        } else {
862            None
863        };
864        let mut sparse_writer = if !sparse_vectors.is_empty() {
865            Some(super::OffsetWriter::new(
866                dir.streaming_writer(&files.sparse).await?,
867            ))
868        } else {
869            None
870        };
871        let mut fast_fields = std::mem::take(&mut self.fast_fields);
872        let num_docs = self.next_doc_id;
873        let mut fast_writer = if !fast_fields.is_empty() {
874            Some(super::OffsetWriter::new(
875                dir.streaming_writer(&files.fast).await?,
876            ))
877        } else {
878            None
879        };
880
881        let ((postings_result, store_result), ((vectors_result, sparse_result), fast_result)) =
882            rayon::join(
883                || {
884                    rayon::join(
885                        || {
886                            postings::build_postings_streaming(
887                                inverted_index,
888                                term_interner,
889                                &position_offsets,
890                                &mut term_dict_writer,
891                                &mut postings_writer,
892                            )
893                        },
894                        || {
895                            store::build_store_streaming(
896                                &store_path,
897                                num_compression_threads,
898                                compression_level,
899                                &mut store_writer,
900                                num_docs,
901                            )
902                        },
903                    )
904                },
905                || {
906                    rayon::join(
907                        || {
908                            rayon::join(
909                                || -> Result<()> {
910                                    if let Some(ref mut w) = vectors_writer {
911                                        dense::build_vectors_streaming(
912                                            dense_vectors,
913                                            binary_dense_vectors,
914                                            schema,
915                                            trained,
916                                            w,
917                                        )?;
918                                    }
919                                    Ok(())
920                                },
921                                || -> Result<()> {
922                                    if let Some(ref mut w) = sparse_writer {
923                                        sparse::build_sparse_streaming(
924                                            &mut sparse_vectors,
925                                            schema,
926                                            w,
927                                        )?;
928                                    }
929                                    Ok(())
930                                },
931                            )
932                        },
933                        || -> Result<()> {
934                            if let Some(ref mut w) = fast_writer {
935                                build_fast_fields_streaming(&mut fast_fields, num_docs, w)?;
936                            }
937                            Ok(())
938                        },
939                    )
940                },
941            );
942        postings_result?;
943        store_result?;
944        vectors_result?;
945        sparse_result?;
946        fast_result?;
947
948        let term_dict_bytes = term_dict_writer.offset() as usize;
949        let postings_bytes = postings_writer.offset() as usize;
950        let store_bytes = store_writer.offset() as usize;
951        let vectors_bytes = vectors_writer.as_ref().map_or(0, |w| w.offset() as usize);
952        let sparse_bytes = sparse_writer.as_ref().map_or(0, |w| w.offset() as usize);
953        let fast_bytes = fast_writer.as_ref().map_or(0, |w| w.offset() as usize);
954
955        term_dict_writer.finish()?;
956        postings_writer.finish()?;
957        store_writer.finish()?;
958        if let Some(w) = vectors_writer {
959            w.finish()?;
960        }
961        if let Some(w) = sparse_writer {
962            w.finish()?;
963        }
964        if let Some(w) = fast_writer {
965            w.finish()?;
966        }
967        drop(position_offsets);
968        drop(sparse_vectors);
969
970        log::info!(
971            "[segment_build] {} docs: term_dict={}, postings={}, store={}, vectors={}, sparse={}, fast={}",
972            num_docs,
973            super::format_bytes(term_dict_bytes),
974            super::format_bytes(postings_bytes),
975            super::format_bytes(store_bytes),
976            super::format_bytes(vectors_bytes),
977            super::format_bytes(sparse_bytes),
978            super::format_bytes(fast_bytes),
979        );
980
981        let meta = SegmentMeta {
982            id: segment_id.0,
983            num_docs: self.next_doc_id,
984            field_stats: self.field_stats.clone(),
985        };
986
987        dir.write(&files.meta, &meta.serialize()?).await?;
988
989        // Cleanup temp files
990        let _ = std::fs::remove_file(&self.store_path);
991
992        Ok(meta)
993    }
994}
995
996/// Serialize all fast-field columns to a `.fast` file.
997fn build_fast_fields_streaming(
998    fast_fields: &mut FxHashMap<u32, crate::structures::fast_field::FastFieldWriter>,
999    num_docs: u32,
1000    writer: &mut dyn Write,
1001) -> Result<()> {
1002    use crate::structures::fast_field::{FastFieldTocEntry, write_fast_field_toc_and_footer};
1003
1004    if fast_fields.is_empty() {
1005        return Ok(());
1006    }
1007
1008    // Sort fields by id for deterministic output
1009    let mut field_ids: Vec<u32> = fast_fields.keys().copied().collect();
1010    field_ids.sort_unstable();
1011
1012    let mut toc_entries: Vec<FastFieldTocEntry> = Vec::with_capacity(field_ids.len());
1013    let mut current_offset = 0u64;
1014
1015    for &field_id in &field_ids {
1016        let ff = fast_fields.get_mut(&field_id).unwrap();
1017        ff.pad_to(num_docs);
1018
1019        let (mut toc, bytes_written) = ff.serialize(writer, current_offset)?;
1020        toc.field_id = field_id;
1021        current_offset += bytes_written;
1022        toc_entries.push(toc);
1023    }
1024
1025    // Write TOC + footer
1026    let toc_offset = current_offset;
1027    write_fast_field_toc_and_footer(writer, toc_offset, &toc_entries)?;
1028
1029    Ok(())
1030}
1031
1032impl Drop for SegmentBuilder {
1033    fn drop(&mut self) {
1034        // Cleanup temp files on drop
1035        let _ = std::fs::remove_file(&self.store_path);
1036    }
1037}