Skip to main content

hermes_core/segment/builder/
mod.rs

1//! Streaming segment builder with optimized memory usage
2//!
3//! Key optimizations:
4//! - **String interning**: Terms are interned using `lasso` to avoid repeated allocations
5//! - **hashbrown HashMap**: O(1) average insertion instead of BTreeMap's O(log n)
6//! - **Streaming document store**: Documents written to disk immediately
7//! - **Zero-copy store build**: Pre-serialized doc bytes passed directly to compressor
8//! - **Parallel posting serialization**: Rayon parallel sort + serialize
9//! - **Inline posting fast path**: Small terms skip PostingList/BlockPostingList entirely
10
11pub(crate) mod bmp;
12mod config;
13mod dense;
14#[cfg(feature = "diagnostics")]
15mod diagnostics;
16mod postings;
17pub(crate) mod simhash;
18mod sparse;
19mod store;
20
21pub use config::{MemoryBreakdown, SegmentBuilderConfig, SegmentBuilderStats};
22
23use std::fs::{File, OpenOptions};
24use std::io::{BufWriter, Write};
25use std::mem::size_of;
26use std::path::PathBuf;
27
28use hashbrown::HashMap;
29use lasso::{Rodeo, Spur};
30use rustc_hash::FxHashMap;
31
32use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
33use std::sync::Arc;
34
35use crate::directories::{Directory, DirectoryWriter};
36use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
37use crate::tokenizer::BoxedTokenizer;
38use crate::{DocId, Result};
39
40use dense::DenseVectorBuilder;
41use postings::{CompactPosting, PositionPostingListBuilder, PostingListBuilder, TermKey};
42use sparse::SparseVectorBuilder;
43
44/// Size of the document store buffer before writing to disk
45const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; // 16MB
46
47/// Memory overhead per new term in the inverted index:
48/// HashMap entry control byte + padding + TermKey + PostingListBuilder + Vec header
49const NEW_TERM_OVERHEAD: usize = size_of::<TermKey>() + size_of::<PostingListBuilder>() + 24;
50
51/// Memory overhead per newly interned string: Spur + arena pointers (2 × usize)
52const INTERN_OVERHEAD: usize = size_of::<Spur>() + 2 * size_of::<usize>();
53
54/// Memory overhead per new term in the position index
55const NEW_POS_TERM_OVERHEAD: usize =
56    size_of::<TermKey>() + size_of::<PositionPostingListBuilder>() + 24;
57
58/// Segment builder with optimized memory usage
59///
60/// Features:
61/// - Streams documents to disk immediately (no in-memory document storage)
62/// - Uses string interning for terms (reduced allocations)
63/// - Uses hashbrown HashMap (faster than BTreeMap)
64pub struct SegmentBuilder {
65    schema: Arc<Schema>,
66    config: SegmentBuilderConfig,
67    tokenizers: FxHashMap<Field, BoxedTokenizer>,
68
69    /// String interner for terms - O(1) lookup and deduplication
70    term_interner: Rodeo,
71
72    /// Inverted index: term key -> posting list
73    inverted_index: HashMap<TermKey, PostingListBuilder>,
74
75    /// Streaming document store writer
76    store_file: BufWriter<File>,
77    store_path: PathBuf,
78
79    /// Document count
80    next_doc_id: DocId,
81
82    /// Per-field statistics for BM25F
83    field_stats: FxHashMap<u32, FieldStats>,
84
85    /// Per-document field lengths stored compactly
86    /// Uses a flat Vec instead of Vec<HashMap> for better cache locality
87    /// Layout: [doc0_field0_len, doc0_field1_len, ..., doc1_field0_len, ...]
88    doc_field_lengths: Vec<u32>,
89    num_indexed_fields: usize,
90    field_to_slot: FxHashMap<u32, usize>,
91
92    /// Reusable buffer for per-document term frequency aggregation
93    /// Avoids allocating a new hashmap for each document
94    local_tf_buffer: FxHashMap<Spur, u32>,
95
96    /// Reusable buffer for per-document position tracking (when positions enabled)
97    /// Avoids allocating a new hashmap for each text field per document
98    local_positions: FxHashMap<Spur, Vec<u32>>,
99
100    /// Reusable buffer for tokenization to avoid per-token String allocations
101    token_buffer: String,
102
103    /// Reusable buffer for numeric field term encoding (avoids format!() alloc per call)
104    numeric_buffer: String,
105
106    /// Dense vector storage per field: field -> (doc_ids, vectors)
107    /// Vectors are stored as flat f32 arrays for efficient RaBitQ indexing
108    dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
109
110    /// Sparse vector storage per field: field -> SparseVectorBuilder
111    /// Uses proper BlockSparsePostingList with configurable quantization
112    sparse_vectors: FxHashMap<u32, SparseVectorBuilder>,
113
114    /// Position index for fields with positions enabled
115    /// term key -> position posting list
116    position_index: HashMap<TermKey, PositionPostingListBuilder>,
117
118    /// Fields that have position tracking enabled, with their mode
119    position_enabled_fields: FxHashMap<u32, Option<crate::dsl::PositionMode>>,
120
121    /// Current element ordinal for multi-valued fields (reset per document)
122    current_element_ordinal: FxHashMap<u32, u32>,
123
124    /// Incrementally tracked memory estimate (avoids expensive stats() calls)
125    estimated_memory: usize,
126
127    /// Reusable buffer for document serialization (avoids per-document allocation)
128    doc_serialize_buffer: Vec<u8>,
129
130    /// Fast-field columnar writers per field_id (only for fields with fast=true)
131    fast_fields: FxHashMap<u32, crate::structures::fast_field::FastFieldWriter>,
132}
133
134impl SegmentBuilder {
135    /// Create a new segment builder
136    pub fn new(schema: Arc<Schema>, config: SegmentBuilderConfig) -> Result<Self> {
137        let segment_id = uuid::Uuid::new_v4();
138        let store_path = config
139            .temp_dir
140            .join(format!("hermes_store_{}.tmp", segment_id));
141
142        let store_file = BufWriter::with_capacity(
143            STORE_BUFFER_SIZE,
144            OpenOptions::new()
145                .create(true)
146                .write(true)
147                .truncate(true)
148                .open(&store_path)?,
149        );
150
151        // Count indexed fields, track positions, and auto-configure tokenizers
152        let registry = crate::tokenizer::TokenizerRegistry::new();
153        let mut num_indexed_fields = 0;
154        let mut field_to_slot = FxHashMap::default();
155        let mut position_enabled_fields = FxHashMap::default();
156        let mut tokenizers = FxHashMap::default();
157        for (field, entry) in schema.fields() {
158            if entry.indexed && matches!(entry.field_type, FieldType::Text) {
159                field_to_slot.insert(field.0, num_indexed_fields);
160                num_indexed_fields += 1;
161                if entry.positions.is_some() {
162                    position_enabled_fields.insert(field.0, entry.positions);
163                }
164                if let Some(ref tok_name) = entry.tokenizer
165                    && let Some(tokenizer) = registry.get(tok_name)
166                {
167                    tokenizers.insert(field, tokenizer);
168                }
169            }
170        }
171
172        // Initialize fast-field writers for fields with fast=true
173        use crate::structures::fast_field::{FastFieldColumnType, FastFieldWriter};
174        let mut fast_fields = FxHashMap::default();
175        for (field, entry) in schema.fields() {
176            if entry.fast {
177                let writer = if entry.multi {
178                    match entry.field_type {
179                        FieldType::U64 => {
180                            FastFieldWriter::new_numeric_multi(FastFieldColumnType::U64)
181                        }
182                        FieldType::I64 => {
183                            FastFieldWriter::new_numeric_multi(FastFieldColumnType::I64)
184                        }
185                        FieldType::F64 => {
186                            FastFieldWriter::new_numeric_multi(FastFieldColumnType::F64)
187                        }
188                        FieldType::Text => FastFieldWriter::new_text_multi(),
189                        _ => continue,
190                    }
191                } else {
192                    match entry.field_type {
193                        FieldType::U64 => FastFieldWriter::new_numeric(FastFieldColumnType::U64),
194                        FieldType::I64 => FastFieldWriter::new_numeric(FastFieldColumnType::I64),
195                        FieldType::F64 => FastFieldWriter::new_numeric(FastFieldColumnType::F64),
196                        FieldType::Text => FastFieldWriter::new_text(),
197                        _ => continue,
198                    }
199                };
200                fast_fields.insert(field.0, writer);
201            }
202        }
203
204        // Auto-create simhash satellite column (U64) for BMP block reordering
205        for (field, entry) in schema.fields() {
206            if entry.simhash && entry.field_type == FieldType::SparseVector {
207                fast_fields
208                    .entry(field.0)
209                    .or_insert_with(|| FastFieldWriter::new_numeric(FastFieldColumnType::U64));
210            }
211        }
212
213        Ok(Self {
214            schema,
215            tokenizers,
216            term_interner: Rodeo::new(),
217            inverted_index: HashMap::with_capacity(config.posting_map_capacity),
218            store_file,
219            store_path,
220            next_doc_id: 0,
221            field_stats: FxHashMap::default(),
222            doc_field_lengths: Vec::new(),
223            num_indexed_fields,
224            field_to_slot,
225            local_tf_buffer: FxHashMap::default(),
226            local_positions: FxHashMap::default(),
227            token_buffer: String::with_capacity(64),
228            numeric_buffer: String::with_capacity(32),
229            config,
230            dense_vectors: FxHashMap::default(),
231            sparse_vectors: FxHashMap::default(),
232            position_index: HashMap::new(),
233            position_enabled_fields,
234            current_element_ordinal: FxHashMap::default(),
235            estimated_memory: 0,
236            doc_serialize_buffer: Vec::with_capacity(256),
237            fast_fields,
238        })
239    }
240
241    pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
242        self.tokenizers.insert(field, tokenizer);
243    }
244
245    /// Get the current element ordinal for a field and increment it.
246    /// Used for multi-valued fields (text, dense_vector, sparse_vector).
247    fn next_element_ordinal(&mut self, field_id: u32) -> u32 {
248        let ordinal = *self.current_element_ordinal.get(&field_id).unwrap_or(&0);
249        *self.current_element_ordinal.entry(field_id).or_insert(0) += 1;
250        ordinal
251    }
252
253    pub fn num_docs(&self) -> u32 {
254        self.next_doc_id
255    }
256
257    /// Fast O(1) memory estimate - updated incrementally during indexing
258    #[inline]
259    pub fn estimated_memory_bytes(&self) -> usize {
260        self.estimated_memory
261    }
262
263    /// Count total unique sparse dimensions across all fields
264    pub fn sparse_dim_count(&self) -> usize {
265        self.sparse_vectors.values().map(|b| b.postings.len()).sum()
266    }
267
268    /// Get current statistics for debugging performance (expensive - iterates all data)
269    pub fn stats(&self) -> SegmentBuilderStats {
270        use std::mem::size_of;
271
272        let postings_in_memory: usize =
273            self.inverted_index.values().map(|p| p.postings.len()).sum();
274
275        // Size constants computed from actual types
276        let compact_posting_size = size_of::<CompactPosting>();
277        let vec_overhead = size_of::<Vec<u8>>(); // Vec header: ptr + len + cap = 24 bytes on 64-bit
278        let term_key_size = size_of::<TermKey>();
279        let posting_builder_size = size_of::<PostingListBuilder>();
280        let spur_size = size_of::<lasso::Spur>();
281        let sparse_entry_size = size_of::<(DocId, u16, f32)>();
282
283        // hashbrown HashMap entry overhead: key + value + 1 byte control + padding
284        // Measured: ~(key_size + value_size + 8) per entry on average
285        let hashmap_entry_base_overhead = 8usize;
286
287        // FxHashMap uses same layout as hashbrown
288        let fxhashmap_entry_overhead = hashmap_entry_base_overhead;
289
290        // Postings memory
291        let postings_bytes: usize = self
292            .inverted_index
293            .values()
294            .map(|p| p.postings.capacity() * compact_posting_size + vec_overhead)
295            .sum();
296
297        // Inverted index overhead
298        let index_overhead_bytes = self.inverted_index.len()
299            * (term_key_size + posting_builder_size + hashmap_entry_base_overhead);
300
301        // Term interner: Rodeo stores strings + metadata
302        // Rodeo internal: string bytes + Spur + arena overhead (~2 pointers per string)
303        let interner_arena_overhead = 2 * size_of::<usize>();
304        let avg_term_len = 8; // Estimated average term length
305        let interner_bytes =
306            self.term_interner.len() * (avg_term_len + spur_size + interner_arena_overhead);
307
308        // Doc field lengths
309        let field_lengths_bytes =
310            self.doc_field_lengths.capacity() * size_of::<u32>() + vec_overhead;
311
312        // Dense vectors
313        let mut dense_vectors_bytes: usize = 0;
314        let mut dense_vector_count: usize = 0;
315        let doc_id_ordinal_size = size_of::<(DocId, u16)>();
316        for b in self.dense_vectors.values() {
317            dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
318                + b.doc_ids.capacity() * doc_id_ordinal_size
319                + 2 * vec_overhead; // Two Vecs
320            dense_vector_count += b.doc_ids.len();
321        }
322
323        // Local buffers
324        let local_tf_entry_size = spur_size + size_of::<u32>() + fxhashmap_entry_overhead;
325        let local_tf_buffer_bytes = self.local_tf_buffer.capacity() * local_tf_entry_size;
326
327        // Sparse vectors
328        let mut sparse_vectors_bytes: usize = 0;
329        for builder in self.sparse_vectors.values() {
330            for postings in builder.postings.values() {
331                sparse_vectors_bytes += postings.capacity() * sparse_entry_size + vec_overhead;
332            }
333            // Inner FxHashMap overhead: u32 key + Vec value ptr + overhead
334            let inner_entry_size = size_of::<u32>() + vec_overhead + fxhashmap_entry_overhead;
335            sparse_vectors_bytes += builder.postings.len() * inner_entry_size;
336        }
337        // Outer FxHashMap overhead
338        let outer_sparse_entry_size =
339            size_of::<u32>() + size_of::<SparseVectorBuilder>() + fxhashmap_entry_overhead;
340        sparse_vectors_bytes += self.sparse_vectors.len() * outer_sparse_entry_size;
341
342        // Position index
343        let mut position_index_bytes: usize = 0;
344        for pos_builder in self.position_index.values() {
345            for (_, positions) in &pos_builder.postings {
346                position_index_bytes += positions.capacity() * size_of::<u32>() + vec_overhead;
347            }
348            // Vec<(DocId, Vec<u32>)> entry size
349            let pos_entry_size = size_of::<DocId>() + vec_overhead;
350            position_index_bytes += pos_builder.postings.capacity() * pos_entry_size;
351        }
352        // HashMap overhead for position_index
353        let pos_index_entry_size =
354            term_key_size + size_of::<PositionPostingListBuilder>() + hashmap_entry_base_overhead;
355        position_index_bytes += self.position_index.len() * pos_index_entry_size;
356
357        let estimated_memory_bytes = postings_bytes
358            + index_overhead_bytes
359            + interner_bytes
360            + field_lengths_bytes
361            + dense_vectors_bytes
362            + local_tf_buffer_bytes
363            + sparse_vectors_bytes
364            + position_index_bytes;
365
366        let memory_breakdown = MemoryBreakdown {
367            postings_bytes,
368            index_overhead_bytes,
369            interner_bytes,
370            field_lengths_bytes,
371            dense_vectors_bytes,
372            dense_vector_count,
373            sparse_vectors_bytes,
374            position_index_bytes,
375        };
376
377        SegmentBuilderStats {
378            num_docs: self.next_doc_id,
379            unique_terms: self.inverted_index.len(),
380            postings_in_memory,
381            interned_strings: self.term_interner.len(),
382            doc_field_lengths_size: self.doc_field_lengths.len(),
383            estimated_memory_bytes,
384            memory_breakdown,
385        }
386    }
387
388    /// Add a document - streams to disk immediately
389    pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
390        let doc_id = self.next_doc_id;
391        self.next_doc_id += 1;
392
393        // Initialize field lengths for this document
394        let base_idx = self.doc_field_lengths.len();
395        self.doc_field_lengths
396            .resize(base_idx + self.num_indexed_fields, 0);
397        self.estimated_memory += self.num_indexed_fields * std::mem::size_of::<u32>();
398
399        // Reset element ordinals for this document (for multi-valued fields)
400        self.current_element_ordinal.clear();
401
402        for (field, value) in doc.field_values() {
403            let Some(entry) = self.schema.get_field_entry(*field) else {
404                continue;
405            };
406
407            // Dense vectors are written to .vectors when indexed || stored
408            // Other field types require indexed or fast
409            if !matches!(&entry.field_type, FieldType::DenseVector) && !entry.indexed && !entry.fast
410            {
411                continue;
412            }
413
414            match (&entry.field_type, value) {
415                (FieldType::Text, FieldValue::Text(text)) => {
416                    if entry.indexed {
417                        let element_ordinal = self.next_element_ordinal(field.0);
418                        let token_count =
419                            self.index_text_field(*field, doc_id, text, element_ordinal)?;
420
421                        let stats = self.field_stats.entry(field.0).or_default();
422                        stats.total_tokens += token_count as u64;
423                        if element_ordinal == 0 {
424                            stats.doc_count += 1;
425                        }
426
427                        if let Some(&slot) = self.field_to_slot.get(&field.0) {
428                            self.doc_field_lengths[base_idx + slot] = token_count;
429                        }
430                    }
431
432                    // Fast-field: store raw text for text ordinal column
433                    if let Some(ff) = self.fast_fields.get_mut(&field.0) {
434                        ff.add_text(doc_id, text);
435                    }
436                }
437                (FieldType::U64, FieldValue::U64(v)) => {
438                    if entry.indexed {
439                        self.index_numeric_field(*field, doc_id, *v)?;
440                    }
441                    if let Some(ff) = self.fast_fields.get_mut(&field.0) {
442                        ff.add_u64(doc_id, *v);
443                    }
444                }
445                (FieldType::I64, FieldValue::I64(v)) => {
446                    if entry.indexed {
447                        self.index_numeric_field(*field, doc_id, *v as u64)?;
448                    }
449                    if let Some(ff) = self.fast_fields.get_mut(&field.0) {
450                        ff.add_i64(doc_id, *v);
451                    }
452                }
453                (FieldType::F64, FieldValue::F64(v)) => {
454                    if entry.indexed {
455                        self.index_numeric_field(*field, doc_id, v.to_bits())?;
456                    }
457                    if let Some(ff) = self.fast_fields.get_mut(&field.0) {
458                        ff.add_f64(doc_id, *v);
459                    }
460                }
461                (FieldType::DenseVector, FieldValue::DenseVector(vec))
462                    if entry.indexed || entry.stored =>
463                {
464                    let ordinal = self.next_element_ordinal(field.0);
465                    self.index_dense_vector_field(*field, doc_id, ordinal as u16, vec)?;
466                }
467                (FieldType::SparseVector, FieldValue::SparseVector(entries)) => {
468                    let has_simhash = entry.simhash;
469                    let ordinal = self.next_element_ordinal(field.0);
470                    self.index_sparse_vector_field(*field, doc_id, ordinal as u16, entries)?;
471                    // Auto-compute simhash from first vector (ordinal 0)
472                    if ordinal == 0
473                        && has_simhash
474                        && let Some(ff) = self.fast_fields.get_mut(&field.0)
475                    {
476                        ff.add_u64(doc_id, simhash::simhash_from_sparse_vector(entries));
477                    }
478                }
479                _ => {}
480            }
481        }
482
483        // Stream document to disk immediately
484        self.write_document_to_store(&doc)?;
485
486        Ok(doc_id)
487    }
488
489    /// Index a text field using interned terms
490    ///
491    /// Uses a custom tokenizer when set for the field (via `set_tokenizer`),
492    /// otherwise falls back to an inline zero-allocation path (split_whitespace
493    /// + lowercase + strip non-alphanumeric).
494    ///
495    /// If position recording is enabled for this field, also records token positions
496    /// encoded as (element_ordinal << 20) | token_position.
497    fn index_text_field(
498        &mut self,
499        field: Field,
500        doc_id: DocId,
501        text: &str,
502        element_ordinal: u32,
503    ) -> Result<u32> {
504        use crate::dsl::PositionMode;
505
506        let field_id = field.0;
507        let position_mode = self
508            .position_enabled_fields
509            .get(&field_id)
510            .copied()
511            .flatten();
512
513        // Phase 1: Aggregate term frequencies within this document
514        // Also collect positions if enabled
515        // Reuse buffers to avoid allocations
516        self.local_tf_buffer.clear();
517        // Clear position Vecs in-place (keeps allocated capacity for reuse)
518        for v in self.local_positions.values_mut() {
519            v.clear();
520        }
521
522        let mut token_position = 0u32;
523
524        // Tokenize: use custom tokenizer if set, else inline zero-alloc path.
525        // The owned Vec<Token> is computed first so the immutable borrow of
526        // self.tokenizers ends before we mutate other fields.
527        let custom_tokens = self.tokenizers.get(&field).map(|t| t.tokenize(text));
528
529        if let Some(tokens) = custom_tokens {
530            // Custom tokenizer path
531            for token in &tokens {
532                let term_spur = if let Some(spur) = self.term_interner.get(&token.text) {
533                    spur
534                } else {
535                    let spur = self.term_interner.get_or_intern(&token.text);
536                    self.estimated_memory += token.text.len() + INTERN_OVERHEAD;
537                    spur
538                };
539                *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
540
541                if let Some(mode) = position_mode {
542                    let encoded_pos = match mode {
543                        PositionMode::Ordinal => element_ordinal << 20,
544                        PositionMode::TokenPosition => token.position,
545                        PositionMode::Full => (element_ordinal << 20) | token.position,
546                    };
547                    self.local_positions
548                        .entry(term_spur)
549                        .or_default()
550                        .push(encoded_pos);
551                }
552            }
553            token_position = tokens.len() as u32;
554        } else {
555            // Inline zero-allocation path: split_whitespace + lowercase + strip non-alphanumeric
556            for word in text.split_whitespace() {
557                self.token_buffer.clear();
558                for c in word.chars() {
559                    if c.is_alphanumeric() {
560                        for lc in c.to_lowercase() {
561                            self.token_buffer.push(lc);
562                        }
563                    }
564                }
565
566                if self.token_buffer.is_empty() {
567                    continue;
568                }
569
570                let term_spur = if let Some(spur) = self.term_interner.get(&self.token_buffer) {
571                    spur
572                } else {
573                    let spur = self.term_interner.get_or_intern(&self.token_buffer);
574                    self.estimated_memory += self.token_buffer.len() + INTERN_OVERHEAD;
575                    spur
576                };
577                *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
578
579                if let Some(mode) = position_mode {
580                    let encoded_pos = match mode {
581                        PositionMode::Ordinal => element_ordinal << 20,
582                        PositionMode::TokenPosition => token_position,
583                        PositionMode::Full => (element_ordinal << 20) | token_position,
584                    };
585                    self.local_positions
586                        .entry(term_spur)
587                        .or_default()
588                        .push(encoded_pos);
589                }
590
591                token_position += 1;
592            }
593        }
594
595        // Phase 2: Insert aggregated terms into inverted index
596        // Now we only do one inverted_index lookup per unique term in doc
597        for (&term_spur, &tf) in &self.local_tf_buffer {
598            let term_key = TermKey {
599                field: field_id,
600                term: term_spur,
601            };
602
603            match self.inverted_index.entry(term_key) {
604                hashbrown::hash_map::Entry::Occupied(mut o) => {
605                    o.get_mut().add(doc_id, tf);
606                    self.estimated_memory += size_of::<CompactPosting>();
607                }
608                hashbrown::hash_map::Entry::Vacant(v) => {
609                    let mut posting = PostingListBuilder::new();
610                    posting.add(doc_id, tf);
611                    v.insert(posting);
612                    self.estimated_memory += size_of::<CompactPosting>() + NEW_TERM_OVERHEAD;
613                }
614            }
615
616            if position_mode.is_some()
617                && let Some(positions) = self.local_positions.get(&term_spur)
618            {
619                match self.position_index.entry(term_key) {
620                    hashbrown::hash_map::Entry::Occupied(mut o) => {
621                        for &pos in positions {
622                            o.get_mut().add_position(doc_id, pos);
623                        }
624                        self.estimated_memory += positions.len() * size_of::<u32>();
625                    }
626                    hashbrown::hash_map::Entry::Vacant(v) => {
627                        let mut pos_posting = PositionPostingListBuilder::new();
628                        for &pos in positions {
629                            pos_posting.add_position(doc_id, pos);
630                        }
631                        self.estimated_memory +=
632                            positions.len() * size_of::<u32>() + NEW_POS_TERM_OVERHEAD;
633                        v.insert(pos_posting);
634                    }
635                }
636            }
637        }
638
639        Ok(token_position)
640    }
641
642    fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
643        use std::fmt::Write;
644
645        self.numeric_buffer.clear();
646        write!(self.numeric_buffer, "__num_{}", value).unwrap();
647        let term_spur = if let Some(spur) = self.term_interner.get(&self.numeric_buffer) {
648            spur
649        } else {
650            let spur = self.term_interner.get_or_intern(&self.numeric_buffer);
651            self.estimated_memory += self.numeric_buffer.len() + INTERN_OVERHEAD;
652            spur
653        };
654
655        let term_key = TermKey {
656            field: field.0,
657            term: term_spur,
658        };
659
660        match self.inverted_index.entry(term_key) {
661            hashbrown::hash_map::Entry::Occupied(mut o) => {
662                o.get_mut().add(doc_id, 1);
663                self.estimated_memory += size_of::<CompactPosting>();
664            }
665            hashbrown::hash_map::Entry::Vacant(v) => {
666                let mut posting = PostingListBuilder::new();
667                posting.add(doc_id, 1);
668                v.insert(posting);
669                self.estimated_memory += size_of::<CompactPosting>() + NEW_TERM_OVERHEAD;
670            }
671        }
672
673        Ok(())
674    }
675
676    /// Index a dense vector field with ordinal tracking
677    fn index_dense_vector_field(
678        &mut self,
679        field: Field,
680        doc_id: DocId,
681        ordinal: u16,
682        vector: &[f32],
683    ) -> Result<()> {
684        let dim = vector.len();
685
686        let builder = self
687            .dense_vectors
688            .entry(field.0)
689            .or_insert_with(|| DenseVectorBuilder::new(dim));
690
691        // Verify dimension consistency
692        if builder.dim != dim && builder.len() > 0 {
693            return Err(crate::Error::Schema(format!(
694                "Dense vector dimension mismatch: expected {}, got {}",
695                builder.dim, dim
696            )));
697        }
698
699        builder.add(doc_id, ordinal, vector);
700
701        self.estimated_memory += std::mem::size_of_val(vector) + size_of::<(DocId, u16)>();
702
703        Ok(())
704    }
705
706    /// Index a sparse vector field using dedicated sparse posting lists
707    ///
708    /// Collects (doc_id, ordinal, weight) postings per dimension. During commit, these are
709    /// converted to BlockSparsePostingList with proper quantization from SparseVectorConfig.
710    ///
711    /// Weights below the configured `weight_threshold` are not indexed.
712    fn index_sparse_vector_field(
713        &mut self,
714        field: Field,
715        doc_id: DocId,
716        ordinal: u16,
717        entries: &[(u32, f32)],
718    ) -> Result<()> {
719        // Get weight threshold from field config (default 0.0 = no filtering)
720        let weight_threshold = self
721            .schema
722            .get_field_entry(field)
723            .and_then(|entry| entry.sparse_vector_config.as_ref())
724            .map(|config| config.weight_threshold)
725            .unwrap_or(0.0);
726
727        let builder = self
728            .sparse_vectors
729            .entry(field.0)
730            .or_insert_with(SparseVectorBuilder::new);
731
732        builder.inc_vector_count();
733
734        for &(dim_id, weight) in entries {
735            // Skip weights below threshold
736            if weight.abs() < weight_threshold {
737                continue;
738            }
739
740            let is_new_dim = !builder.postings.contains_key(&dim_id);
741            builder.add(dim_id, doc_id, ordinal, weight);
742            self.estimated_memory += size_of::<(DocId, u16, f32)>();
743            if is_new_dim {
744                // HashMap entry overhead + Vec header
745                self.estimated_memory += size_of::<u32>() + size_of::<Vec<(DocId, u16, f32)>>() + 8; // 8 = hashmap control byte + padding
746            }
747        }
748
749        Ok(())
750    }
751
752    /// Write document to streaming store (reuses internal buffer to avoid per-doc allocation)
753    fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
754        use byteorder::{LittleEndian, WriteBytesExt};
755
756        super::store::serialize_document_into(doc, &self.schema, &mut self.doc_serialize_buffer)?;
757
758        self.store_file
759            .write_u32::<LittleEndian>(self.doc_serialize_buffer.len() as u32)?;
760        self.store_file.write_all(&self.doc_serialize_buffer)?;
761
762        Ok(())
763    }
764
765    /// Build the final segment
766    ///
767    /// Streams all data directly to disk via StreamingWriter to avoid buffering
768    /// entire serialized outputs in memory. Each phase consumes and drops its
769    /// source data before the next phase begins.
770    pub async fn build<D: Directory + DirectoryWriter>(
771        mut self,
772        dir: &D,
773        segment_id: SegmentId,
774        trained: Option<&super::TrainedVectorStructures>,
775    ) -> Result<SegmentMeta> {
776        // Flush any buffered data
777        self.store_file.flush()?;
778
779        let files = SegmentFiles::new(segment_id.0);
780
781        // Phase 1: Stream positions directly to disk (consumes position_index)
782        let position_index = std::mem::take(&mut self.position_index);
783        let position_offsets = if !position_index.is_empty() {
784            let mut pos_writer = dir.streaming_writer(&files.positions).await?;
785            let offsets = postings::build_positions_streaming(
786                position_index,
787                &self.term_interner,
788                &mut *pos_writer,
789            )?;
790            pos_writer.finish()?;
791            offsets
792        } else {
793            FxHashMap::default()
794        };
795
796        // Phase 2: 4-way parallel build — postings, store, dense vectors, sparse vectors
797        // These are fully independent: different source data, different output files.
798        let inverted_index = std::mem::take(&mut self.inverted_index);
799        let term_interner = std::mem::replace(&mut self.term_interner, Rodeo::new());
800        let store_path = self.store_path.clone();
801        let num_compression_threads = self.config.num_compression_threads;
802        let compression_level = self.config.compression_level;
803        let dense_vectors = std::mem::take(&mut self.dense_vectors);
804        let mut sparse_vectors = std::mem::take(&mut self.sparse_vectors);
805        let schema = &self.schema;
806
807        // Pre-create all streaming writers (async) before entering sync rayon scope
808        // Wrapped in OffsetWriter to track bytes written per phase.
809        let mut term_dict_writer =
810            super::OffsetWriter::new(dir.streaming_writer(&files.term_dict).await?);
811        let mut postings_writer =
812            super::OffsetWriter::new(dir.streaming_writer(&files.postings).await?);
813        let mut store_writer = super::OffsetWriter::new(dir.streaming_writer(&files.store).await?);
814        let mut vectors_writer = if !dense_vectors.is_empty() {
815            Some(super::OffsetWriter::new(
816                dir.streaming_writer(&files.vectors).await?,
817            ))
818        } else {
819            None
820        };
821        let mut sparse_writer = if !sparse_vectors.is_empty() {
822            Some(super::OffsetWriter::new(
823                dir.streaming_writer(&files.sparse).await?,
824            ))
825        } else {
826            None
827        };
828        let mut fast_fields = std::mem::take(&mut self.fast_fields);
829        let num_docs = self.next_doc_id;
830        let mut fast_writer = if !fast_fields.is_empty() {
831            Some(super::OffsetWriter::new(
832                dir.streaming_writer(&files.fast).await?,
833            ))
834        } else {
835            None
836        };
837
838        let ((postings_result, store_result), ((vectors_result, sparse_result), fast_result)) =
839            rayon::join(
840                || {
841                    rayon::join(
842                        || {
843                            postings::build_postings_streaming(
844                                inverted_index,
845                                term_interner,
846                                &position_offsets,
847                                &mut term_dict_writer,
848                                &mut postings_writer,
849                            )
850                        },
851                        || {
852                            store::build_store_streaming(
853                                &store_path,
854                                num_compression_threads,
855                                compression_level,
856                                &mut store_writer,
857                                num_docs,
858                            )
859                        },
860                    )
861                },
862                || {
863                    rayon::join(
864                        || {
865                            rayon::join(
866                                || -> Result<()> {
867                                    if let Some(ref mut w) = vectors_writer {
868                                        dense::build_vectors_streaming(
869                                            dense_vectors,
870                                            schema,
871                                            trained,
872                                            w,
873                                        )?;
874                                    }
875                                    Ok(())
876                                },
877                                || -> Result<()> {
878                                    if let Some(ref mut w) = sparse_writer {
879                                        sparse::build_sparse_streaming(
880                                            &mut sparse_vectors,
881                                            schema,
882                                            w,
883                                        )?;
884                                    }
885                                    Ok(())
886                                },
887                            )
888                        },
889                        || -> Result<()> {
890                            if let Some(ref mut w) = fast_writer {
891                                build_fast_fields_streaming(&mut fast_fields, num_docs, w)?;
892                            }
893                            Ok(())
894                        },
895                    )
896                },
897            );
898        postings_result?;
899        store_result?;
900        vectors_result?;
901        sparse_result?;
902        fast_result?;
903
904        let term_dict_bytes = term_dict_writer.offset() as usize;
905        let postings_bytes = postings_writer.offset() as usize;
906        let store_bytes = store_writer.offset() as usize;
907        let vectors_bytes = vectors_writer.as_ref().map_or(0, |w| w.offset() as usize);
908        let sparse_bytes = sparse_writer.as_ref().map_or(0, |w| w.offset() as usize);
909        let fast_bytes = fast_writer.as_ref().map_or(0, |w| w.offset() as usize);
910
911        term_dict_writer.finish()?;
912        postings_writer.finish()?;
913        store_writer.finish()?;
914        if let Some(w) = vectors_writer {
915            w.finish()?;
916        }
917        if let Some(w) = sparse_writer {
918            w.finish()?;
919        }
920        if let Some(w) = fast_writer {
921            w.finish()?;
922        }
923        drop(position_offsets);
924        drop(sparse_vectors);
925
926        log::info!(
927            "[segment_build] {} docs: term_dict={}, postings={}, store={}, vectors={}, sparse={}, fast={}",
928            num_docs,
929            super::format_bytes(term_dict_bytes),
930            super::format_bytes(postings_bytes),
931            super::format_bytes(store_bytes),
932            super::format_bytes(vectors_bytes),
933            super::format_bytes(sparse_bytes),
934            super::format_bytes(fast_bytes),
935        );
936
937        let meta = SegmentMeta {
938            id: segment_id.0,
939            num_docs: self.next_doc_id,
940            field_stats: self.field_stats.clone(),
941        };
942
943        dir.write(&files.meta, &meta.serialize()?).await?;
944
945        // Cleanup temp files
946        let _ = std::fs::remove_file(&self.store_path);
947
948        Ok(meta)
949    }
950}
951
952/// Serialize all fast-field columns to a `.fast` file.
953fn build_fast_fields_streaming(
954    fast_fields: &mut FxHashMap<u32, crate::structures::fast_field::FastFieldWriter>,
955    num_docs: u32,
956    writer: &mut dyn Write,
957) -> Result<()> {
958    use crate::structures::fast_field::{FastFieldTocEntry, write_fast_field_toc_and_footer};
959
960    if fast_fields.is_empty() {
961        return Ok(());
962    }
963
964    // Sort fields by id for deterministic output
965    let mut field_ids: Vec<u32> = fast_fields.keys().copied().collect();
966    field_ids.sort_unstable();
967
968    let mut toc_entries: Vec<FastFieldTocEntry> = Vec::with_capacity(field_ids.len());
969    let mut current_offset = 0u64;
970
971    for &field_id in &field_ids {
972        let ff = fast_fields.get_mut(&field_id).unwrap();
973        ff.pad_to(num_docs);
974
975        let (mut toc, bytes_written) = ff.serialize(writer, current_offset)?;
976        toc.field_id = field_id;
977        current_offset += bytes_written;
978        toc_entries.push(toc);
979    }
980
981    // Write TOC + footer
982    let toc_offset = current_offset;
983    write_fast_field_toc_and_footer(writer, toc_offset, &toc_entries)?;
984
985    Ok(())
986}
987
988impl Drop for SegmentBuilder {
989    fn drop(&mut self) {
990        // Cleanup temp files on drop
991        let _ = std::fs::remove_file(&self.store_path);
992    }
993}