Skip to main content

hermes_core/segment/builder/
mod.rs

1//! Streaming segment builder with optimized memory usage
2//!
3//! Key optimizations:
4//! - **String interning**: Terms are interned using `lasso` to avoid repeated allocations
5//! - **hashbrown HashMap**: O(1) average insertion instead of BTreeMap's O(log n)
6//! - **Streaming document store**: Documents written to disk immediately
7//! - **Zero-copy store build**: Pre-serialized doc bytes passed directly to compressor
8//! - **Parallel posting serialization**: Rayon parallel sort + serialize
9//! - **Inline posting fast path**: Small terms skip PostingList/BlockPostingList entirely
10
11mod config;
12mod dense;
13#[cfg(feature = "diagnostics")]
14mod diagnostics;
15mod postings;
16mod sparse;
17mod store;
18
19pub use config::{MemoryBreakdown, SegmentBuilderConfig, SegmentBuilderStats};
20
21use std::fs::{File, OpenOptions};
22use std::io::{BufWriter, Write};
23use std::mem::size_of;
24use std::path::PathBuf;
25
26use hashbrown::HashMap;
27use lasso::{Rodeo, Spur};
28use rustc_hash::FxHashMap;
29
30use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
31use std::sync::Arc;
32
33use crate::directories::{Directory, DirectoryWriter};
34use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
35use crate::tokenizer::BoxedTokenizer;
36use crate::{DocId, Result};
37
38use dense::DenseVectorBuilder;
39use postings::{CompactPosting, PositionPostingListBuilder, PostingListBuilder, TermKey};
40use sparse::SparseVectorBuilder;
41
42/// Size of the document store buffer before writing to disk
43const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; // 16MB
44
45/// Memory overhead per new term in the inverted index:
46/// HashMap entry control byte + padding + TermKey + PostingListBuilder + Vec header
47const NEW_TERM_OVERHEAD: usize = size_of::<TermKey>() + size_of::<PostingListBuilder>() + 24;
48
49/// Memory overhead per newly interned string: Spur + arena pointers (2 × usize)
50const INTERN_OVERHEAD: usize = size_of::<Spur>() + 2 * size_of::<usize>();
51
52/// Memory overhead per new term in the position index
53const NEW_POS_TERM_OVERHEAD: usize =
54    size_of::<TermKey>() + size_of::<PositionPostingListBuilder>() + 24;
55
56/// Segment builder with optimized memory usage
57///
58/// Features:
59/// - Streams documents to disk immediately (no in-memory document storage)
60/// - Uses string interning for terms (reduced allocations)
61/// - Uses hashbrown HashMap (faster than BTreeMap)
62pub struct SegmentBuilder {
63    schema: Arc<Schema>,
64    config: SegmentBuilderConfig,
65    tokenizers: FxHashMap<Field, BoxedTokenizer>,
66
67    /// String interner for terms - O(1) lookup and deduplication
68    term_interner: Rodeo,
69
70    /// Inverted index: term key -> posting list
71    inverted_index: HashMap<TermKey, PostingListBuilder>,
72
73    /// Streaming document store writer
74    store_file: BufWriter<File>,
75    store_path: PathBuf,
76
77    /// Document count
78    next_doc_id: DocId,
79
80    /// Per-field statistics for BM25F
81    field_stats: FxHashMap<u32, FieldStats>,
82
83    /// Per-document field lengths stored compactly
84    /// Uses a flat Vec instead of Vec<HashMap> for better cache locality
85    /// Layout: [doc0_field0_len, doc0_field1_len, ..., doc1_field0_len, ...]
86    doc_field_lengths: Vec<u32>,
87    num_indexed_fields: usize,
88    field_to_slot: FxHashMap<u32, usize>,
89
90    /// Reusable buffer for per-document term frequency aggregation
91    /// Avoids allocating a new hashmap for each document
92    local_tf_buffer: FxHashMap<Spur, u32>,
93
94    /// Reusable buffer for per-document position tracking (when positions enabled)
95    /// Avoids allocating a new hashmap for each text field per document
96    local_positions: FxHashMap<Spur, Vec<u32>>,
97
98    /// Reusable buffer for tokenization to avoid per-token String allocations
99    token_buffer: String,
100
101    /// Reusable buffer for numeric field term encoding (avoids format!() alloc per call)
102    numeric_buffer: String,
103
104    /// Dense vector storage per field: field -> (doc_ids, vectors)
105    /// Vectors are stored as flat f32 arrays for efficient RaBitQ indexing
106    dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
107
108    /// Sparse vector storage per field: field -> SparseVectorBuilder
109    /// Uses proper BlockSparsePostingList with configurable quantization
110    sparse_vectors: FxHashMap<u32, SparseVectorBuilder>,
111
112    /// Position index for fields with positions enabled
113    /// term key -> position posting list
114    position_index: HashMap<TermKey, PositionPostingListBuilder>,
115
116    /// Fields that have position tracking enabled, with their mode
117    position_enabled_fields: FxHashMap<u32, Option<crate::dsl::PositionMode>>,
118
119    /// Current element ordinal for multi-valued fields (reset per document)
120    current_element_ordinal: FxHashMap<u32, u32>,
121
122    /// Incrementally tracked memory estimate (avoids expensive stats() calls)
123    estimated_memory: usize,
124
125    /// Reusable buffer for document serialization (avoids per-document allocation)
126    doc_serialize_buffer: Vec<u8>,
127
128    /// Fast-field columnar writers per field_id (only for fields with fast=true)
129    fast_fields: FxHashMap<u32, crate::structures::fast_field::FastFieldWriter>,
130}
131
132impl SegmentBuilder {
133    /// Create a new segment builder
134    pub fn new(schema: Arc<Schema>, config: SegmentBuilderConfig) -> Result<Self> {
135        let segment_id = uuid::Uuid::new_v4();
136        let store_path = config
137            .temp_dir
138            .join(format!("hermes_store_{}.tmp", segment_id));
139
140        let store_file = BufWriter::with_capacity(
141            STORE_BUFFER_SIZE,
142            OpenOptions::new()
143                .create(true)
144                .write(true)
145                .truncate(true)
146                .open(&store_path)?,
147        );
148
149        // Count indexed fields for compact field length storage
150        // Also track which fields have position recording enabled
151        let mut num_indexed_fields = 0;
152        let mut field_to_slot = FxHashMap::default();
153        let mut position_enabled_fields = FxHashMap::default();
154        for (field, entry) in schema.fields() {
155            if entry.indexed && matches!(entry.field_type, FieldType::Text) {
156                field_to_slot.insert(field.0, num_indexed_fields);
157                num_indexed_fields += 1;
158                if entry.positions.is_some() {
159                    position_enabled_fields.insert(field.0, entry.positions);
160                }
161            }
162        }
163
164        // Initialize fast-field writers for fields with fast=true
165        use crate::structures::fast_field::{FastFieldColumnType, FastFieldWriter};
166        let mut fast_fields = FxHashMap::default();
167        for (field, entry) in schema.fields() {
168            if entry.fast {
169                let writer = match entry.field_type {
170                    FieldType::U64 => FastFieldWriter::new_numeric(FastFieldColumnType::U64),
171                    FieldType::I64 => FastFieldWriter::new_numeric(FastFieldColumnType::I64),
172                    FieldType::F64 => FastFieldWriter::new_numeric(FastFieldColumnType::F64),
173                    FieldType::Text => FastFieldWriter::new_text(),
174                    _ => continue, // fast not supported for other types
175                };
176                fast_fields.insert(field.0, writer);
177            }
178        }
179
180        Ok(Self {
181            schema,
182            tokenizers: FxHashMap::default(),
183            term_interner: Rodeo::new(),
184            inverted_index: HashMap::with_capacity(config.posting_map_capacity),
185            store_file,
186            store_path,
187            next_doc_id: 0,
188            field_stats: FxHashMap::default(),
189            doc_field_lengths: Vec::new(),
190            num_indexed_fields,
191            field_to_slot,
192            local_tf_buffer: FxHashMap::default(),
193            local_positions: FxHashMap::default(),
194            token_buffer: String::with_capacity(64),
195            numeric_buffer: String::with_capacity(32),
196            config,
197            dense_vectors: FxHashMap::default(),
198            sparse_vectors: FxHashMap::default(),
199            position_index: HashMap::new(),
200            position_enabled_fields,
201            current_element_ordinal: FxHashMap::default(),
202            estimated_memory: 0,
203            doc_serialize_buffer: Vec::with_capacity(256),
204            fast_fields,
205        })
206    }
207
208    pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
209        self.tokenizers.insert(field, tokenizer);
210    }
211
212    /// Get the current element ordinal for a field and increment it.
213    /// Used for multi-valued fields (text, dense_vector, sparse_vector).
214    fn next_element_ordinal(&mut self, field_id: u32) -> u32 {
215        let ordinal = *self.current_element_ordinal.get(&field_id).unwrap_or(&0);
216        *self.current_element_ordinal.entry(field_id).or_insert(0) += 1;
217        ordinal
218    }
219
220    pub fn num_docs(&self) -> u32 {
221        self.next_doc_id
222    }
223
224    /// Fast O(1) memory estimate - updated incrementally during indexing
225    #[inline]
226    pub fn estimated_memory_bytes(&self) -> usize {
227        self.estimated_memory
228    }
229
230    /// Count total unique sparse dimensions across all fields
231    pub fn sparse_dim_count(&self) -> usize {
232        self.sparse_vectors.values().map(|b| b.postings.len()).sum()
233    }
234
235    /// Get current statistics for debugging performance (expensive - iterates all data)
236    pub fn stats(&self) -> SegmentBuilderStats {
237        use std::mem::size_of;
238
239        let postings_in_memory: usize =
240            self.inverted_index.values().map(|p| p.postings.len()).sum();
241
242        // Size constants computed from actual types
243        let compact_posting_size = size_of::<CompactPosting>();
244        let vec_overhead = size_of::<Vec<u8>>(); // Vec header: ptr + len + cap = 24 bytes on 64-bit
245        let term_key_size = size_of::<TermKey>();
246        let posting_builder_size = size_of::<PostingListBuilder>();
247        let spur_size = size_of::<lasso::Spur>();
248        let sparse_entry_size = size_of::<(DocId, u16, f32)>();
249
250        // hashbrown HashMap entry overhead: key + value + 1 byte control + padding
251        // Measured: ~(key_size + value_size + 8) per entry on average
252        let hashmap_entry_base_overhead = 8usize;
253
254        // FxHashMap uses same layout as hashbrown
255        let fxhashmap_entry_overhead = hashmap_entry_base_overhead;
256
257        // Postings memory
258        let postings_bytes: usize = self
259            .inverted_index
260            .values()
261            .map(|p| p.postings.capacity() * compact_posting_size + vec_overhead)
262            .sum();
263
264        // Inverted index overhead
265        let index_overhead_bytes = self.inverted_index.len()
266            * (term_key_size + posting_builder_size + hashmap_entry_base_overhead);
267
268        // Term interner: Rodeo stores strings + metadata
269        // Rodeo internal: string bytes + Spur + arena overhead (~2 pointers per string)
270        let interner_arena_overhead = 2 * size_of::<usize>();
271        let avg_term_len = 8; // Estimated average term length
272        let interner_bytes =
273            self.term_interner.len() * (avg_term_len + spur_size + interner_arena_overhead);
274
275        // Doc field lengths
276        let field_lengths_bytes =
277            self.doc_field_lengths.capacity() * size_of::<u32>() + vec_overhead;
278
279        // Dense vectors
280        let mut dense_vectors_bytes: usize = 0;
281        let mut dense_vector_count: usize = 0;
282        let doc_id_ordinal_size = size_of::<(DocId, u16)>();
283        for b in self.dense_vectors.values() {
284            dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
285                + b.doc_ids.capacity() * doc_id_ordinal_size
286                + 2 * vec_overhead; // Two Vecs
287            dense_vector_count += b.doc_ids.len();
288        }
289
290        // Local buffers
291        let local_tf_entry_size = spur_size + size_of::<u32>() + fxhashmap_entry_overhead;
292        let local_tf_buffer_bytes = self.local_tf_buffer.capacity() * local_tf_entry_size;
293
294        // Sparse vectors
295        let mut sparse_vectors_bytes: usize = 0;
296        for builder in self.sparse_vectors.values() {
297            for postings in builder.postings.values() {
298                sparse_vectors_bytes += postings.capacity() * sparse_entry_size + vec_overhead;
299            }
300            // Inner FxHashMap overhead: u32 key + Vec value ptr + overhead
301            let inner_entry_size = size_of::<u32>() + vec_overhead + fxhashmap_entry_overhead;
302            sparse_vectors_bytes += builder.postings.len() * inner_entry_size;
303        }
304        // Outer FxHashMap overhead
305        let outer_sparse_entry_size =
306            size_of::<u32>() + size_of::<SparseVectorBuilder>() + fxhashmap_entry_overhead;
307        sparse_vectors_bytes += self.sparse_vectors.len() * outer_sparse_entry_size;
308
309        // Position index
310        let mut position_index_bytes: usize = 0;
311        for pos_builder in self.position_index.values() {
312            for (_, positions) in &pos_builder.postings {
313                position_index_bytes += positions.capacity() * size_of::<u32>() + vec_overhead;
314            }
315            // Vec<(DocId, Vec<u32>)> entry size
316            let pos_entry_size = size_of::<DocId>() + vec_overhead;
317            position_index_bytes += pos_builder.postings.capacity() * pos_entry_size;
318        }
319        // HashMap overhead for position_index
320        let pos_index_entry_size =
321            term_key_size + size_of::<PositionPostingListBuilder>() + hashmap_entry_base_overhead;
322        position_index_bytes += self.position_index.len() * pos_index_entry_size;
323
324        let estimated_memory_bytes = postings_bytes
325            + index_overhead_bytes
326            + interner_bytes
327            + field_lengths_bytes
328            + dense_vectors_bytes
329            + local_tf_buffer_bytes
330            + sparse_vectors_bytes
331            + position_index_bytes;
332
333        let memory_breakdown = MemoryBreakdown {
334            postings_bytes,
335            index_overhead_bytes,
336            interner_bytes,
337            field_lengths_bytes,
338            dense_vectors_bytes,
339            dense_vector_count,
340            sparse_vectors_bytes,
341            position_index_bytes,
342        };
343
344        SegmentBuilderStats {
345            num_docs: self.next_doc_id,
346            unique_terms: self.inverted_index.len(),
347            postings_in_memory,
348            interned_strings: self.term_interner.len(),
349            doc_field_lengths_size: self.doc_field_lengths.len(),
350            estimated_memory_bytes,
351            memory_breakdown,
352        }
353    }
354
355    /// Add a document - streams to disk immediately
356    pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
357        let doc_id = self.next_doc_id;
358        self.next_doc_id += 1;
359
360        // Initialize field lengths for this document
361        let base_idx = self.doc_field_lengths.len();
362        self.doc_field_lengths
363            .resize(base_idx + self.num_indexed_fields, 0);
364        self.estimated_memory += self.num_indexed_fields * std::mem::size_of::<u32>();
365
366        // Reset element ordinals for this document (for multi-valued fields)
367        self.current_element_ordinal.clear();
368
369        for (field, value) in doc.field_values() {
370            let Some(entry) = self.schema.get_field_entry(*field) else {
371                continue;
372            };
373
374            // Dense vectors are written to .vectors when indexed || stored
375            // Other field types require indexed
376            if !matches!(&entry.field_type, FieldType::DenseVector) && !entry.indexed {
377                continue;
378            }
379
380            match (&entry.field_type, value) {
381                (FieldType::Text, FieldValue::Text(text)) => {
382                    let element_ordinal = self.next_element_ordinal(field.0);
383                    let token_count =
384                        self.index_text_field(*field, doc_id, text, element_ordinal)?;
385
386                    let stats = self.field_stats.entry(field.0).or_default();
387                    stats.total_tokens += token_count as u64;
388                    if element_ordinal == 0 {
389                        stats.doc_count += 1;
390                    }
391
392                    if let Some(&slot) = self.field_to_slot.get(&field.0) {
393                        self.doc_field_lengths[base_idx + slot] = token_count;
394                    }
395
396                    // Fast-field: store raw text for text ordinal column
397                    if let Some(ff) = self.fast_fields.get_mut(&field.0) {
398                        ff.add_text(doc_id, text);
399                    }
400                }
401                (FieldType::U64, FieldValue::U64(v)) => {
402                    self.index_numeric_field(*field, doc_id, *v)?;
403                    if let Some(ff) = self.fast_fields.get_mut(&field.0) {
404                        ff.add_u64(doc_id, *v);
405                    }
406                }
407                (FieldType::I64, FieldValue::I64(v)) => {
408                    self.index_numeric_field(*field, doc_id, *v as u64)?;
409                    if let Some(ff) = self.fast_fields.get_mut(&field.0) {
410                        ff.add_i64(doc_id, *v);
411                    }
412                }
413                (FieldType::F64, FieldValue::F64(v)) => {
414                    self.index_numeric_field(*field, doc_id, v.to_bits())?;
415                    if let Some(ff) = self.fast_fields.get_mut(&field.0) {
416                        ff.add_f64(doc_id, *v);
417                    }
418                }
419                (FieldType::DenseVector, FieldValue::DenseVector(vec))
420                    if entry.indexed || entry.stored =>
421                {
422                    let ordinal = self.next_element_ordinal(field.0);
423                    self.index_dense_vector_field(*field, doc_id, ordinal as u16, vec)?;
424                }
425                (FieldType::SparseVector, FieldValue::SparseVector(entries)) => {
426                    let ordinal = self.next_element_ordinal(field.0);
427                    self.index_sparse_vector_field(*field, doc_id, ordinal as u16, entries)?;
428                }
429                _ => {}
430            }
431        }
432
433        // Stream document to disk immediately
434        self.write_document_to_store(&doc)?;
435
436        Ok(doc_id)
437    }
438
439    /// Index a text field using interned terms
440    ///
441    /// Uses a custom tokenizer when set for the field (via `set_tokenizer`),
442    /// otherwise falls back to an inline zero-allocation path (split_whitespace
443    /// + lowercase + strip non-alphanumeric).
444    ///
445    /// If position recording is enabled for this field, also records token positions
446    /// encoded as (element_ordinal << 20) | token_position.
447    fn index_text_field(
448        &mut self,
449        field: Field,
450        doc_id: DocId,
451        text: &str,
452        element_ordinal: u32,
453    ) -> Result<u32> {
454        use crate::dsl::PositionMode;
455
456        let field_id = field.0;
457        let position_mode = self
458            .position_enabled_fields
459            .get(&field_id)
460            .copied()
461            .flatten();
462
463        // Phase 1: Aggregate term frequencies within this document
464        // Also collect positions if enabled
465        // Reuse buffers to avoid allocations
466        self.local_tf_buffer.clear();
467        // Clear position Vecs in-place (keeps allocated capacity for reuse)
468        for v in self.local_positions.values_mut() {
469            v.clear();
470        }
471
472        let mut token_position = 0u32;
473
474        // Tokenize: use custom tokenizer if set, else inline zero-alloc path.
475        // The owned Vec<Token> is computed first so the immutable borrow of
476        // self.tokenizers ends before we mutate other fields.
477        let custom_tokens = self.tokenizers.get(&field).map(|t| t.tokenize(text));
478
479        if let Some(tokens) = custom_tokens {
480            // Custom tokenizer path
481            for token in &tokens {
482                let is_new_string = !self.term_interner.contains(&token.text);
483                let term_spur = self.term_interner.get_or_intern(&token.text);
484                if is_new_string {
485                    self.estimated_memory += token.text.len() + INTERN_OVERHEAD;
486                }
487                *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
488
489                if let Some(mode) = position_mode {
490                    let encoded_pos = match mode {
491                        PositionMode::Ordinal => element_ordinal << 20,
492                        PositionMode::TokenPosition => token.position,
493                        PositionMode::Full => (element_ordinal << 20) | token.position,
494                    };
495                    self.local_positions
496                        .entry(term_spur)
497                        .or_default()
498                        .push(encoded_pos);
499                }
500            }
501            token_position = tokens.len() as u32;
502        } else {
503            // Inline zero-allocation path: split_whitespace + lowercase + strip non-alphanumeric
504            for word in text.split_whitespace() {
505                self.token_buffer.clear();
506                for c in word.chars() {
507                    if c.is_alphanumeric() {
508                        for lc in c.to_lowercase() {
509                            self.token_buffer.push(lc);
510                        }
511                    }
512                }
513
514                if self.token_buffer.is_empty() {
515                    continue;
516                }
517
518                let is_new_string = !self.term_interner.contains(&self.token_buffer);
519                let term_spur = self.term_interner.get_or_intern(&self.token_buffer);
520                if is_new_string {
521                    self.estimated_memory += self.token_buffer.len() + INTERN_OVERHEAD;
522                }
523                *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
524
525                if let Some(mode) = position_mode {
526                    let encoded_pos = match mode {
527                        PositionMode::Ordinal => element_ordinal << 20,
528                        PositionMode::TokenPosition => token_position,
529                        PositionMode::Full => (element_ordinal << 20) | token_position,
530                    };
531                    self.local_positions
532                        .entry(term_spur)
533                        .or_default()
534                        .push(encoded_pos);
535                }
536
537                token_position += 1;
538            }
539        }
540
541        // Phase 2: Insert aggregated terms into inverted index
542        // Now we only do one inverted_index lookup per unique term in doc
543        for (&term_spur, &tf) in &self.local_tf_buffer {
544            let term_key = TermKey {
545                field: field_id,
546                term: term_spur,
547            };
548
549            let is_new_term = !self.inverted_index.contains_key(&term_key);
550            let posting = self
551                .inverted_index
552                .entry(term_key)
553                .or_insert_with(PostingListBuilder::new);
554            posting.add(doc_id, tf);
555
556            self.estimated_memory += size_of::<CompactPosting>();
557            if is_new_term {
558                self.estimated_memory += NEW_TERM_OVERHEAD;
559            }
560
561            if position_mode.is_some()
562                && let Some(positions) = self.local_positions.get(&term_spur)
563            {
564                let is_new_pos_term = !self.position_index.contains_key(&term_key);
565                let pos_posting = self
566                    .position_index
567                    .entry(term_key)
568                    .or_insert_with(PositionPostingListBuilder::new);
569                for &pos in positions {
570                    pos_posting.add_position(doc_id, pos);
571                }
572                self.estimated_memory += positions.len() * size_of::<u32>();
573                if is_new_pos_term {
574                    self.estimated_memory += NEW_POS_TERM_OVERHEAD;
575                }
576            }
577        }
578
579        Ok(token_position)
580    }
581
582    fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
583        use std::fmt::Write;
584
585        self.numeric_buffer.clear();
586        write!(self.numeric_buffer, "__num_{}", value).unwrap();
587        let is_new_string = !self.term_interner.contains(&self.numeric_buffer);
588        let term_spur = self.term_interner.get_or_intern(&self.numeric_buffer);
589
590        let term_key = TermKey {
591            field: field.0,
592            term: term_spur,
593        };
594
595        let is_new_term = !self.inverted_index.contains_key(&term_key);
596        let posting = self
597            .inverted_index
598            .entry(term_key)
599            .or_insert_with(PostingListBuilder::new);
600        posting.add(doc_id, 1);
601
602        self.estimated_memory += size_of::<CompactPosting>();
603        if is_new_term {
604            self.estimated_memory += NEW_TERM_OVERHEAD;
605        }
606        if is_new_string {
607            self.estimated_memory += self.numeric_buffer.len() + INTERN_OVERHEAD;
608        }
609
610        Ok(())
611    }
612
613    /// Index a dense vector field with ordinal tracking
614    fn index_dense_vector_field(
615        &mut self,
616        field: Field,
617        doc_id: DocId,
618        ordinal: u16,
619        vector: &[f32],
620    ) -> Result<()> {
621        let dim = vector.len();
622
623        let builder = self
624            .dense_vectors
625            .entry(field.0)
626            .or_insert_with(|| DenseVectorBuilder::new(dim));
627
628        // Verify dimension consistency
629        if builder.dim != dim && builder.len() > 0 {
630            return Err(crate::Error::Schema(format!(
631                "Dense vector dimension mismatch: expected {}, got {}",
632                builder.dim, dim
633            )));
634        }
635
636        builder.add(doc_id, ordinal, vector);
637
638        self.estimated_memory += std::mem::size_of_val(vector) + size_of::<(DocId, u16)>();
639
640        Ok(())
641    }
642
643    /// Index a sparse vector field using dedicated sparse posting lists
644    ///
645    /// Collects (doc_id, ordinal, weight) postings per dimension. During commit, these are
646    /// converted to BlockSparsePostingList with proper quantization from SparseVectorConfig.
647    ///
648    /// Weights below the configured `weight_threshold` are not indexed.
649    fn index_sparse_vector_field(
650        &mut self,
651        field: Field,
652        doc_id: DocId,
653        ordinal: u16,
654        entries: &[(u32, f32)],
655    ) -> Result<()> {
656        // Get weight threshold from field config (default 0.0 = no filtering)
657        let weight_threshold = self
658            .schema
659            .get_field_entry(field)
660            .and_then(|entry| entry.sparse_vector_config.as_ref())
661            .map(|config| config.weight_threshold)
662            .unwrap_or(0.0);
663
664        let builder = self
665            .sparse_vectors
666            .entry(field.0)
667            .or_insert_with(SparseVectorBuilder::new);
668
669        for &(dim_id, weight) in entries {
670            // Skip weights below threshold
671            if weight.abs() < weight_threshold {
672                continue;
673            }
674
675            let is_new_dim = !builder.postings.contains_key(&dim_id);
676            builder.add(dim_id, doc_id, ordinal, weight);
677            self.estimated_memory += size_of::<(DocId, u16, f32)>();
678            if is_new_dim {
679                // HashMap entry overhead + Vec header
680                self.estimated_memory += size_of::<u32>() + size_of::<Vec<(DocId, u16, f32)>>() + 8; // 8 = hashmap control byte + padding
681            }
682        }
683
684        Ok(())
685    }
686
687    /// Write document to streaming store (reuses internal buffer to avoid per-doc allocation)
688    fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
689        use byteorder::{LittleEndian, WriteBytesExt};
690
691        super::store::serialize_document_into(doc, &self.schema, &mut self.doc_serialize_buffer)?;
692
693        self.store_file
694            .write_u32::<LittleEndian>(self.doc_serialize_buffer.len() as u32)?;
695        self.store_file.write_all(&self.doc_serialize_buffer)?;
696
697        Ok(())
698    }
699
700    /// Build the final segment
701    ///
702    /// Streams all data directly to disk via StreamingWriter to avoid buffering
703    /// entire serialized outputs in memory. Each phase consumes and drops its
704    /// source data before the next phase begins.
705    pub async fn build<D: Directory + DirectoryWriter>(
706        mut self,
707        dir: &D,
708        segment_id: SegmentId,
709        trained: Option<&super::TrainedVectorStructures>,
710    ) -> Result<SegmentMeta> {
711        // Flush any buffered data
712        self.store_file.flush()?;
713
714        let files = SegmentFiles::new(segment_id.0);
715
716        // Phase 1: Stream positions directly to disk (consumes position_index)
717        let position_index = std::mem::take(&mut self.position_index);
718        let position_offsets = if !position_index.is_empty() {
719            let mut pos_writer = dir.streaming_writer(&files.positions).await?;
720            let offsets = postings::build_positions_streaming(
721                position_index,
722                &self.term_interner,
723                &mut *pos_writer,
724            )?;
725            pos_writer.finish()?;
726            offsets
727        } else {
728            FxHashMap::default()
729        };
730
731        // Phase 2: 4-way parallel build — postings, store, dense vectors, sparse vectors
732        // These are fully independent: different source data, different output files.
733        let inverted_index = std::mem::take(&mut self.inverted_index);
734        let term_interner = std::mem::replace(&mut self.term_interner, Rodeo::new());
735        let store_path = self.store_path.clone();
736        let num_compression_threads = self.config.num_compression_threads;
737        let compression_level = self.config.compression_level;
738        let dense_vectors = std::mem::take(&mut self.dense_vectors);
739        let mut sparse_vectors = std::mem::take(&mut self.sparse_vectors);
740        let schema = &self.schema;
741
742        // Pre-create all streaming writers (async) before entering sync rayon scope
743        let mut term_dict_writer = dir.streaming_writer(&files.term_dict).await?;
744        let mut postings_writer = dir.streaming_writer(&files.postings).await?;
745        let mut store_writer = dir.streaming_writer(&files.store).await?;
746        let mut vectors_writer = if !dense_vectors.is_empty() {
747            Some(dir.streaming_writer(&files.vectors).await?)
748        } else {
749            None
750        };
751        let mut sparse_writer = if !sparse_vectors.is_empty() {
752            Some(dir.streaming_writer(&files.sparse).await?)
753        } else {
754            None
755        };
756        let mut fast_fields = std::mem::take(&mut self.fast_fields);
757        let num_docs = self.next_doc_id;
758        let mut fast_writer = if !fast_fields.is_empty() {
759            Some(dir.streaming_writer(&files.fast).await?)
760        } else {
761            None
762        };
763
764        let ((postings_result, store_result), ((vectors_result, sparse_result), fast_result)) =
765            rayon::join(
766                || {
767                    rayon::join(
768                        || {
769                            postings::build_postings_streaming(
770                                inverted_index,
771                                term_interner,
772                                &position_offsets,
773                                &mut *term_dict_writer,
774                                &mut *postings_writer,
775                            )
776                        },
777                        || {
778                            store::build_store_streaming(
779                                &store_path,
780                                num_compression_threads,
781                                compression_level,
782                                &mut *store_writer,
783                                num_docs,
784                            )
785                        },
786                    )
787                },
788                || {
789                    rayon::join(
790                        || {
791                            rayon::join(
792                                || -> Result<()> {
793                                    if let Some(ref mut w) = vectors_writer {
794                                        dense::build_vectors_streaming(
795                                            dense_vectors,
796                                            schema,
797                                            trained,
798                                            &mut **w,
799                                        )?;
800                                    }
801                                    Ok(())
802                                },
803                                || -> Result<()> {
804                                    if let Some(ref mut w) = sparse_writer {
805                                        sparse::build_sparse_streaming(
806                                            &mut sparse_vectors,
807                                            schema,
808                                            &mut **w,
809                                        )?;
810                                    }
811                                    Ok(())
812                                },
813                            )
814                        },
815                        || -> Result<()> {
816                            if let Some(ref mut w) = fast_writer {
817                                build_fast_fields_streaming(&mut fast_fields, num_docs, &mut **w)?;
818                            }
819                            Ok(())
820                        },
821                    )
822                },
823            );
824        postings_result?;
825        store_result?;
826        vectors_result?;
827        sparse_result?;
828        fast_result?;
829        term_dict_writer.finish()?;
830        postings_writer.finish()?;
831        store_writer.finish()?;
832        if let Some(w) = vectors_writer {
833            w.finish()?;
834        }
835        if let Some(w) = sparse_writer {
836            w.finish()?;
837        }
838        if let Some(w) = fast_writer {
839            w.finish()?;
840        }
841        drop(position_offsets);
842        drop(sparse_vectors);
843
844        let meta = SegmentMeta {
845            id: segment_id.0,
846            num_docs: self.next_doc_id,
847            field_stats: self.field_stats.clone(),
848        };
849
850        dir.write(&files.meta, &meta.serialize()?).await?;
851
852        // Cleanup temp files
853        let _ = std::fs::remove_file(&self.store_path);
854
855        Ok(meta)
856    }
857}
858
859/// Serialize all fast-field columns to a `.fast` file.
860fn build_fast_fields_streaming(
861    fast_fields: &mut FxHashMap<u32, crate::structures::fast_field::FastFieldWriter>,
862    num_docs: u32,
863    writer: &mut dyn Write,
864) -> Result<()> {
865    use crate::structures::fast_field::{FastFieldTocEntry, write_fast_field_toc_and_footer};
866
867    if fast_fields.is_empty() {
868        return Ok(());
869    }
870
871    // Sort fields by id for deterministic output
872    let mut field_ids: Vec<u32> = fast_fields.keys().copied().collect();
873    field_ids.sort_unstable();
874
875    let mut toc_entries: Vec<FastFieldTocEntry> = Vec::with_capacity(field_ids.len());
876    let mut current_offset = 0u64;
877
878    for &field_id in &field_ids {
879        let ff = fast_fields.get_mut(&field_id).unwrap();
880        ff.pad_to(num_docs);
881
882        let (mut toc, bytes_written) = ff.serialize(writer, current_offset)?;
883        toc.field_id = field_id;
884        current_offset += bytes_written;
885        toc_entries.push(toc);
886    }
887
888    // Write TOC + footer
889    let toc_offset = current_offset;
890    write_fast_field_toc_and_footer(writer, toc_offset, &toc_entries)?;
891
892    log::debug!(
893        "[fast-fields] wrote {} columns, {} docs, toc at offset {}",
894        toc_entries.len(),
895        num_docs,
896        toc_offset
897    );
898
899    Ok(())
900}
901
902impl Drop for SegmentBuilder {
903    fn drop(&mut self) {
904        // Cleanup temp files on drop
905        let _ = std::fs::remove_file(&self.store_path);
906    }
907}