Skip to main content

hermes_core/segment/builder/
mod.rs

1//! Streaming segment builder with optimized memory usage
2//!
3//! Key optimizations:
4//! - **String interning**: Terms are interned using `lasso` to avoid repeated allocations
5//! - **hashbrown HashMap**: O(1) average insertion instead of BTreeMap's O(log n)
6//! - **Streaming document store**: Documents written to disk immediately
7//! - **Zero-copy store build**: Pre-serialized doc bytes passed directly to compressor
8//! - **Parallel posting serialization**: Rayon parallel sort + serialize
9//! - **Inline posting fast path**: Small terms skip PostingList/BlockPostingList entirely
10
11#[cfg_attr(not(feature = "native"), allow(dead_code))]
12pub(crate) mod bmp;
13mod config;
14mod dense;
15#[cfg(feature = "diagnostics")]
16mod diagnostics;
17#[cfg_attr(not(feature = "native"), allow(dead_code))]
18pub(crate) mod graph_bisection;
19mod postings;
20mod sparse;
21mod store;
22
23pub use config::{MemoryBreakdown, SegmentBuilderConfig, SegmentBuilderStats};
24
25#[cfg(feature = "native")]
26use std::fs::{File, OpenOptions};
27#[cfg(feature = "native")]
28use std::io::BufWriter;
29use std::io::Write;
30use std::mem::size_of;
31#[cfg(feature = "native")]
32use std::path::PathBuf;
33
34use hashbrown::HashMap;
35use rustc_hash::FxHashMap;
36
37// String interning: lasso on native (fast arena), HashMap on WASM (no C deps)
38#[cfg(feature = "native")]
39use lasso::{Rodeo, Spur};
40
41#[cfg(not(feature = "native"))]
42pub(crate) mod simple_interner {
43    use hashbrown::HashMap;
44
45    #[derive(Clone, Copy, PartialEq, Eq, Hash)]
46    pub struct Spur(u32);
47
48    /// Simple string interner for WASM (replaces lasso::Rodeo).
49    /// Stores each string once in a Vec; HashMap maps &str → index.
50    pub struct Rodeo {
51        /// Canonical storage — each string lives here exactly once.
52        strings: Vec<Box<str>>,
53        /// Maps borrowed string slices (pointing into `strings`) to their index.
54        /// Safety: entries are never removed and Box<str> has a stable address.
55        map: HashMap<&'static str, u32>,
56    }
57
58    impl Rodeo {
59        pub fn new() -> Self {
60            Self {
61                strings: Vec::new(),
62                map: HashMap::new(),
63            }
64        }
65
66        pub fn get(&self, key: &str) -> Option<Spur> {
67            self.map.get(key).map(|&id| Spur(id))
68        }
69
70        pub fn get_or_intern(&mut self, key: &str) -> Spur {
71            if let Some(&id) = self.map.get(key) {
72                return Spur(id);
73            }
74            let id = self.strings.len() as u32;
75            let boxed: Box<str> = key.into();
76            // Safety: the Box<str> is stored in self.strings (append-only Vec)
77            // and never moved or freed while the Rodeo is alive.
78            let static_ref: &'static str = unsafe { &*(boxed.as_ref() as *const str) };
79            self.strings.push(boxed);
80            self.map.insert(static_ref, id);
81            Spur(id)
82        }
83
84        pub fn resolve(&self, spur: &Spur) -> &str {
85            &self.strings[spur.0 as usize]
86        }
87
88        pub fn len(&self) -> usize {
89            self.strings.len()
90        }
91    }
92}
93
94#[cfg(not(feature = "native"))]
95use simple_interner::{Rodeo, Spur};
96
97use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
98use std::sync::Arc;
99
100use crate::directories::{Directory, DirectoryWriter};
101use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
102use crate::tokenizer::BoxedTokenizer;
103use crate::{DocId, Result};
104
105use dense::{BinaryDenseVectorBuilder, DenseVectorBuilder};
106use postings::{CompactPosting, PositionPostingListBuilder, PostingListBuilder, TermKey};
107use sparse::SparseVectorBuilder;
108
109/// Size of the document store buffer before writing to disk
110const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; // 16MB
111
112/// Memory overhead per new term in the inverted index:
113/// HashMap entry control byte + padding + TermKey + PostingListBuilder + Vec header
114const NEW_TERM_OVERHEAD: usize = size_of::<TermKey>() + size_of::<PostingListBuilder>() + 24;
115
116/// Memory overhead per newly interned string: Spur + arena pointers (2 × usize)
117const INTERN_OVERHEAD: usize = size_of::<Spur>() + 2 * size_of::<usize>();
118
119/// Memory overhead per new term in the position index
120const NEW_POS_TERM_OVERHEAD: usize =
121    size_of::<TermKey>() + size_of::<PositionPostingListBuilder>() + 24;
122
123/// Segment builder with optimized memory usage
124///
125/// Features:
126/// - Streams documents to disk immediately (no in-memory document storage)
127/// - Uses string interning for terms (reduced allocations)
128/// - Uses hashbrown HashMap (faster than BTreeMap)
129pub struct SegmentBuilder {
130    schema: Arc<Schema>,
131    config: SegmentBuilderConfig,
132    tokenizers: FxHashMap<Field, BoxedTokenizer>,
133
134    /// String interner for terms - O(1) lookup and deduplication
135    term_interner: Rodeo,
136
137    /// Inverted index: term key -> posting list
138    inverted_index: HashMap<TermKey, PostingListBuilder>,
139
140    /// Streaming document store writer (native: temp file on disk, WASM: in-memory buffer)
141    #[cfg(feature = "native")]
142    store_file: BufWriter<File>,
143    #[cfg(feature = "native")]
144    store_path: PathBuf,
145    #[cfg(not(feature = "native"))]
146    store_buffer: Vec<u8>,
147
148    /// Document count
149    next_doc_id: DocId,
150
151    /// Per-field statistics for BM25F
152    field_stats: FxHashMap<u32, FieldStats>,
153
154    /// Per-document field lengths stored compactly
155    /// Uses a flat Vec instead of Vec<HashMap> for better cache locality
156    /// Layout: [doc0_field0_len, doc0_field1_len, ..., doc1_field0_len, ...]
157    doc_field_lengths: Vec<u32>,
158    num_indexed_fields: usize,
159    field_to_slot: FxHashMap<u32, usize>,
160
161    /// Reusable buffer for per-document term frequency aggregation
162    /// Avoids allocating a new hashmap for each document
163    local_tf_buffer: FxHashMap<Spur, u32>,
164
165    /// Reusable buffer for per-document position tracking (when positions enabled)
166    /// Avoids allocating a new hashmap for each text field per document
167    local_positions: FxHashMap<Spur, Vec<u32>>,
168
169    /// Reusable buffer for tokenization to avoid per-token String allocations
170    token_buffer: String,
171
172    /// Reusable buffer for numeric field term encoding (avoids format!() alloc per call)
173    numeric_buffer: String,
174
175    /// Dense vector storage per field: field -> (doc_ids, vectors)
176    /// Vectors are stored as flat f32 arrays for efficient RaBitQ indexing
177    dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
178
179    /// Binary dense vector storage per field: field -> packed-bit vectors
180    binary_dense_vectors: FxHashMap<u32, BinaryDenseVectorBuilder>,
181
182    /// Sparse vector storage per field: field -> SparseVectorBuilder
183    /// Uses proper BlockSparsePostingList with configurable quantization
184    sparse_vectors: FxHashMap<u32, SparseVectorBuilder>,
185
186    /// Position index for fields with positions enabled
187    /// term key -> position posting list
188    position_index: HashMap<TermKey, PositionPostingListBuilder>,
189
190    /// Fields that have position tracking enabled, with their mode
191    position_enabled_fields: FxHashMap<u32, Option<crate::dsl::PositionMode>>,
192
193    /// Current element ordinal for multi-valued fields (reset per document)
194    current_element_ordinal: FxHashMap<u32, u32>,
195
196    /// Incrementally tracked memory estimate (avoids expensive stats() calls)
197    estimated_memory: usize,
198
199    /// Reusable buffer for document serialization (avoids per-document allocation)
200    doc_serialize_buffer: Vec<u8>,
201
202    /// Fast-field columnar writers per field_id (only for fields with fast=true)
203    fast_fields: FxHashMap<u32, crate::structures::fast_field::FastFieldWriter>,
204}
205
206impl SegmentBuilder {
207    /// Create a new segment builder
208    pub fn new(schema: Arc<Schema>, config: SegmentBuilderConfig) -> Result<Self> {
209        #[cfg(feature = "native")]
210        let (store_file, store_path) = {
211            let segment_id = uuid::Uuid::new_v4();
212            let store_path = config
213                .temp_dir
214                .join(format!("hermes_store_{}.tmp", segment_id));
215            let store_file = BufWriter::with_capacity(
216                STORE_BUFFER_SIZE,
217                OpenOptions::new()
218                    .create(true)
219                    .write(true)
220                    .truncate(true)
221                    .open(&store_path)?,
222            );
223            (store_file, store_path)
224        };
225
226        // Count indexed fields, track positions, and auto-configure tokenizers
227        let registry = crate::tokenizer::TokenizerRegistry::new();
228        let mut num_indexed_fields = 0;
229        let mut field_to_slot = FxHashMap::default();
230        let mut position_enabled_fields = FxHashMap::default();
231        let mut tokenizers = FxHashMap::default();
232        for (field, entry) in schema.fields() {
233            if entry.indexed && matches!(entry.field_type, FieldType::Text) {
234                field_to_slot.insert(field.0, num_indexed_fields);
235                num_indexed_fields += 1;
236                if entry.positions.is_some() {
237                    position_enabled_fields.insert(field.0, entry.positions);
238                }
239                if let Some(ref tok_name) = entry.tokenizer
240                    && let Some(tokenizer) = registry.get(tok_name)
241                {
242                    tokenizers.insert(field, tokenizer);
243                }
244            }
245        }
246
247        // Initialize fast-field writers for fields with fast=true
248        use crate::structures::fast_field::{FastFieldColumnType, FastFieldWriter};
249        let mut fast_fields = FxHashMap::default();
250        for (field, entry) in schema.fields() {
251            if entry.fast {
252                let writer = if entry.multi {
253                    match entry.field_type {
254                        FieldType::U64 => {
255                            FastFieldWriter::new_numeric_multi(FastFieldColumnType::U64)
256                        }
257                        FieldType::I64 => {
258                            FastFieldWriter::new_numeric_multi(FastFieldColumnType::I64)
259                        }
260                        FieldType::F64 => {
261                            FastFieldWriter::new_numeric_multi(FastFieldColumnType::F64)
262                        }
263                        FieldType::Text => FastFieldWriter::new_text_multi(),
264                        _ => continue,
265                    }
266                } else {
267                    match entry.field_type {
268                        FieldType::U64 => FastFieldWriter::new_numeric(FastFieldColumnType::U64),
269                        FieldType::I64 => FastFieldWriter::new_numeric(FastFieldColumnType::I64),
270                        FieldType::F64 => FastFieldWriter::new_numeric(FastFieldColumnType::F64),
271                        FieldType::Text => FastFieldWriter::new_text(),
272                        _ => continue,
273                    }
274                };
275                fast_fields.insert(field.0, writer);
276            }
277        }
278
279        Ok(Self {
280            schema,
281            tokenizers,
282            term_interner: Rodeo::new(),
283            inverted_index: HashMap::with_capacity(config.posting_map_capacity),
284            #[cfg(feature = "native")]
285            store_file,
286            #[cfg(feature = "native")]
287            store_path,
288            #[cfg(not(feature = "native"))]
289            store_buffer: Vec::with_capacity(STORE_BUFFER_SIZE),
290            next_doc_id: 0,
291            field_stats: FxHashMap::default(),
292            doc_field_lengths: Vec::new(),
293            num_indexed_fields,
294            field_to_slot,
295            local_tf_buffer: FxHashMap::default(),
296            local_positions: FxHashMap::default(),
297            token_buffer: String::with_capacity(64),
298            numeric_buffer: String::with_capacity(32),
299            config,
300            dense_vectors: FxHashMap::default(),
301            binary_dense_vectors: FxHashMap::default(),
302            sparse_vectors: FxHashMap::default(),
303            position_index: HashMap::new(),
304            position_enabled_fields,
305            current_element_ordinal: FxHashMap::default(),
306            estimated_memory: 0,
307            doc_serialize_buffer: Vec::with_capacity(256),
308            fast_fields,
309        })
310    }
311
312    pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
313        self.tokenizers.insert(field, tokenizer);
314    }
315
316    /// Get the current element ordinal for a field and increment it.
317    /// Used for multi-valued fields (text, dense_vector, sparse_vector).
318    fn next_element_ordinal(&mut self, field_id: u32) -> u32 {
319        let ordinal = *self.current_element_ordinal.get(&field_id).unwrap_or(&0);
320        *self.current_element_ordinal.entry(field_id).or_insert(0) += 1;
321        ordinal
322    }
323
324    pub fn num_docs(&self) -> u32 {
325        self.next_doc_id
326    }
327
328    /// Fast O(1) memory estimate - updated incrementally during indexing
329    #[inline]
330    pub fn estimated_memory_bytes(&self) -> usize {
331        self.estimated_memory
332    }
333
334    /// Count total unique sparse dimensions across all fields
335    pub fn sparse_dim_count(&self) -> usize {
336        self.sparse_vectors.values().map(|b| b.postings.len()).sum()
337    }
338
339    /// Get current statistics for debugging performance (expensive - iterates all data)
340    pub fn stats(&self) -> SegmentBuilderStats {
341        use std::mem::size_of;
342
343        let postings_in_memory: usize =
344            self.inverted_index.values().map(|p| p.postings.len()).sum();
345
346        // Size constants computed from actual types
347        let compact_posting_size = size_of::<CompactPosting>();
348        let vec_overhead = size_of::<Vec<u8>>(); // Vec header: ptr + len + cap = 24 bytes on 64-bit
349        let term_key_size = size_of::<TermKey>();
350        let posting_builder_size = size_of::<PostingListBuilder>();
351        let spur_size = size_of::<Spur>();
352        let sparse_entry_size = size_of::<(DocId, u16, f32)>();
353
354        // hashbrown HashMap entry overhead: key + value + 1 byte control + padding
355        // Measured: ~(key_size + value_size + 8) per entry on average
356        let hashmap_entry_base_overhead = 8usize;
357
358        // FxHashMap uses same layout as hashbrown
359        let fxhashmap_entry_overhead = hashmap_entry_base_overhead;
360
361        // Postings memory
362        let postings_bytes: usize = self
363            .inverted_index
364            .values()
365            .map(|p| p.postings.capacity() * compact_posting_size + vec_overhead)
366            .sum();
367
368        // Inverted index overhead
369        let index_overhead_bytes = self.inverted_index.len()
370            * (term_key_size + posting_builder_size + hashmap_entry_base_overhead);
371
372        // Term interner: Rodeo stores strings + metadata
373        // Rodeo internal: string bytes + Spur + arena overhead (~2 pointers per string)
374        let interner_arena_overhead = 2 * size_of::<usize>();
375        let avg_term_len = 8; // Estimated average term length
376        let interner_bytes =
377            self.term_interner.len() * (avg_term_len + spur_size + interner_arena_overhead);
378
379        // Doc field lengths
380        let field_lengths_bytes =
381            self.doc_field_lengths.capacity() * size_of::<u32>() + vec_overhead;
382
383        // Dense vectors
384        let mut dense_vectors_bytes: usize = 0;
385        let mut dense_vector_count: usize = 0;
386        let doc_id_ordinal_size = size_of::<(DocId, u16)>();
387        for b in self.dense_vectors.values() {
388            dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
389                + b.doc_ids.capacity() * doc_id_ordinal_size
390                + 2 * vec_overhead; // Two Vecs
391            dense_vector_count += b.doc_ids.len();
392        }
393        // Binary dense vectors
394        for b in self.binary_dense_vectors.values() {
395            dense_vectors_bytes += b.vectors.capacity()
396                + b.doc_ids.capacity() * doc_id_ordinal_size
397                + 2 * vec_overhead;
398            dense_vector_count += b.doc_ids.len();
399        }
400
401        // Local buffers
402        let local_tf_entry_size = spur_size + size_of::<u32>() + fxhashmap_entry_overhead;
403        let local_tf_buffer_bytes = self.local_tf_buffer.capacity() * local_tf_entry_size;
404
405        // Sparse vectors
406        let mut sparse_vectors_bytes: usize = 0;
407        for builder in self.sparse_vectors.values() {
408            for postings in builder.postings.values() {
409                sparse_vectors_bytes += postings.capacity() * sparse_entry_size + vec_overhead;
410            }
411            // Inner FxHashMap overhead: u32 key + Vec value ptr + overhead
412            let inner_entry_size = size_of::<u32>() + vec_overhead + fxhashmap_entry_overhead;
413            sparse_vectors_bytes += builder.postings.len() * inner_entry_size;
414        }
415        // Outer FxHashMap overhead
416        let outer_sparse_entry_size =
417            size_of::<u32>() + size_of::<SparseVectorBuilder>() + fxhashmap_entry_overhead;
418        sparse_vectors_bytes += self.sparse_vectors.len() * outer_sparse_entry_size;
419
420        // Position index
421        let mut position_index_bytes: usize = 0;
422        for pos_builder in self.position_index.values() {
423            for (_, positions) in &pos_builder.postings {
424                position_index_bytes += positions.capacity() * size_of::<u32>() + vec_overhead;
425            }
426            // Vec<(DocId, Vec<u32>)> entry size
427            let pos_entry_size = size_of::<DocId>() + vec_overhead;
428            position_index_bytes += pos_builder.postings.capacity() * pos_entry_size;
429        }
430        // HashMap overhead for position_index
431        let pos_index_entry_size =
432            term_key_size + size_of::<PositionPostingListBuilder>() + hashmap_entry_base_overhead;
433        position_index_bytes += self.position_index.len() * pos_index_entry_size;
434
435        let estimated_memory_bytes = postings_bytes
436            + index_overhead_bytes
437            + interner_bytes
438            + field_lengths_bytes
439            + dense_vectors_bytes
440            + local_tf_buffer_bytes
441            + sparse_vectors_bytes
442            + position_index_bytes;
443
444        let memory_breakdown = MemoryBreakdown {
445            postings_bytes,
446            index_overhead_bytes,
447            interner_bytes,
448            field_lengths_bytes,
449            dense_vectors_bytes,
450            dense_vector_count,
451            sparse_vectors_bytes,
452            position_index_bytes,
453        };
454
455        SegmentBuilderStats {
456            num_docs: self.next_doc_id,
457            unique_terms: self.inverted_index.len(),
458            postings_in_memory,
459            interned_strings: self.term_interner.len(),
460            doc_field_lengths_size: self.doc_field_lengths.len(),
461            estimated_memory_bytes,
462            memory_breakdown,
463        }
464    }
465
466    /// Add a document - streams to disk immediately
467    pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
468        let doc_id = self.next_doc_id;
469        self.next_doc_id += 1;
470
471        // Initialize field lengths for this document
472        let base_idx = self.doc_field_lengths.len();
473        self.doc_field_lengths
474            .resize(base_idx + self.num_indexed_fields, 0);
475        self.estimated_memory += self.num_indexed_fields * std::mem::size_of::<u32>();
476
477        // Reset element ordinals for this document (for multi-valued fields)
478        self.current_element_ordinal.clear();
479
480        for (field, value) in doc.field_values() {
481            let Some(entry) = self.schema.get_field_entry(*field) else {
482                continue;
483            };
484
485            // Dense/binary vectors are written to .vectors when indexed || stored
486            // Other field types require indexed or fast
487            if !matches!(
488                &entry.field_type,
489                FieldType::DenseVector | FieldType::BinaryDenseVector
490            ) && !entry.indexed
491                && !entry.fast
492            {
493                continue;
494            }
495
496            match (&entry.field_type, value) {
497                (FieldType::Text, FieldValue::Text(text)) => {
498                    if entry.indexed {
499                        let element_ordinal = self.next_element_ordinal(field.0);
500                        let token_count =
501                            self.index_text_field(*field, doc_id, text, element_ordinal)?;
502
503                        let stats = self.field_stats.entry(field.0).or_default();
504                        stats.total_tokens += token_count as u64;
505                        if element_ordinal == 0 {
506                            stats.doc_count += 1;
507                        }
508
509                        if let Some(&slot) = self.field_to_slot.get(&field.0) {
510                            self.doc_field_lengths[base_idx + slot] = token_count;
511                        }
512                    }
513
514                    // Fast-field: store raw text for text ordinal column
515                    if let Some(ff) = self.fast_fields.get_mut(&field.0) {
516                        ff.add_text(doc_id, text);
517                    }
518                }
519                (FieldType::U64, FieldValue::U64(v)) => {
520                    if entry.indexed {
521                        self.index_numeric_field(*field, doc_id, *v)?;
522                    }
523                    if let Some(ff) = self.fast_fields.get_mut(&field.0) {
524                        ff.add_u64(doc_id, *v);
525                    }
526                }
527                (FieldType::I64, FieldValue::I64(v)) => {
528                    if entry.indexed {
529                        self.index_numeric_field(*field, doc_id, *v as u64)?;
530                    }
531                    if let Some(ff) = self.fast_fields.get_mut(&field.0) {
532                        ff.add_i64(doc_id, *v);
533                    }
534                }
535                (FieldType::F64, FieldValue::F64(v)) => {
536                    if entry.indexed {
537                        self.index_numeric_field(*field, doc_id, v.to_bits())?;
538                    }
539                    if let Some(ff) = self.fast_fields.get_mut(&field.0) {
540                        ff.add_f64(doc_id, *v);
541                    }
542                }
543                (FieldType::DenseVector, FieldValue::DenseVector(vec))
544                    if entry.indexed || entry.stored =>
545                {
546                    let ordinal = self.next_element_ordinal(field.0);
547                    self.index_dense_vector_field(*field, doc_id, ordinal as u16, vec)?;
548                }
549                (FieldType::BinaryDenseVector, FieldValue::BinaryDenseVector(bytes))
550                    if entry.indexed || entry.stored =>
551                {
552                    let ordinal = self.next_element_ordinal(field.0);
553                    self.index_binary_dense_vector_field(*field, doc_id, ordinal as u16, bytes)?;
554                }
555                (FieldType::SparseVector, FieldValue::SparseVector(entries)) => {
556                    let ordinal = self.next_element_ordinal(field.0);
557                    self.index_sparse_vector_field(*field, doc_id, ordinal as u16, entries)?;
558                }
559                _ => {}
560            }
561        }
562
563        // Stream document to disk immediately
564        self.write_document_to_store(&doc)?;
565
566        Ok(doc_id)
567    }
568
569    /// Index a text field using interned terms
570    ///
571    /// Uses a custom tokenizer when set for the field (via `set_tokenizer`),
572    /// otherwise falls back to an inline zero-allocation path (split_whitespace
573    /// + lowercase + strip non-alphanumeric).
574    ///
575    /// If position recording is enabled for this field, also records token positions
576    /// encoded as (element_ordinal << 20) | token_position.
577    fn index_text_field(
578        &mut self,
579        field: Field,
580        doc_id: DocId,
581        text: &str,
582        element_ordinal: u32,
583    ) -> Result<u32> {
584        use crate::dsl::PositionMode;
585
586        let field_id = field.0;
587        let position_mode = self
588            .position_enabled_fields
589            .get(&field_id)
590            .copied()
591            .flatten();
592
593        // Phase 1: Aggregate term frequencies within this document
594        // Also collect positions if enabled
595        // Reuse buffers to avoid allocations
596        self.local_tf_buffer.clear();
597        // Clear position Vecs in-place (keeps allocated capacity for reuse)
598        for v in self.local_positions.values_mut() {
599            v.clear();
600        }
601
602        let mut token_position = 0u32;
603
604        // Tokenize: use custom tokenizer if set, else inline zero-alloc path.
605        // The owned Vec<Token> is computed first so the immutable borrow of
606        // self.tokenizers ends before we mutate other fields.
607        let custom_tokens = self.tokenizers.get(&field).map(|t| t.tokenize(text));
608
609        if let Some(tokens) = custom_tokens {
610            // Custom tokenizer path
611            for token in &tokens {
612                let term_spur = if let Some(spur) = self.term_interner.get(&token.text) {
613                    spur
614                } else {
615                    let spur = self.term_interner.get_or_intern(&token.text);
616                    self.estimated_memory += token.text.len() + INTERN_OVERHEAD;
617                    spur
618                };
619                *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
620
621                if let Some(mode) = position_mode {
622                    let encoded_pos = match mode {
623                        PositionMode::Ordinal => element_ordinal << 20,
624                        PositionMode::TokenPosition => token.position,
625                        PositionMode::Full => (element_ordinal << 20) | token.position,
626                    };
627                    self.local_positions
628                        .entry(term_spur)
629                        .or_default()
630                        .push(encoded_pos);
631                }
632            }
633            token_position = tokens.len() as u32;
634        } else {
635            // Inline zero-allocation path: split_whitespace + lowercase + strip non-alphanumeric
636            for word in text.split_whitespace() {
637                self.token_buffer.clear();
638                for c in word.chars() {
639                    if c.is_alphanumeric() {
640                        for lc in c.to_lowercase() {
641                            self.token_buffer.push(lc);
642                        }
643                    }
644                }
645
646                if self.token_buffer.is_empty() {
647                    continue;
648                }
649
650                let term_spur = if let Some(spur) = self.term_interner.get(&self.token_buffer) {
651                    spur
652                } else {
653                    let spur = self.term_interner.get_or_intern(&self.token_buffer);
654                    self.estimated_memory += self.token_buffer.len() + INTERN_OVERHEAD;
655                    spur
656                };
657                *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
658
659                if let Some(mode) = position_mode {
660                    let encoded_pos = match mode {
661                        PositionMode::Ordinal => element_ordinal << 20,
662                        PositionMode::TokenPosition => token_position,
663                        PositionMode::Full => (element_ordinal << 20) | token_position,
664                    };
665                    self.local_positions
666                        .entry(term_spur)
667                        .or_default()
668                        .push(encoded_pos);
669                }
670
671                token_position += 1;
672            }
673        }
674
675        // Phase 2: Insert aggregated terms into inverted index
676        // Now we only do one inverted_index lookup per unique term in doc
677        for (&term_spur, &tf) in &self.local_tf_buffer {
678            let term_key = TermKey {
679                field: field_id,
680                term: term_spur,
681            };
682
683            match self.inverted_index.entry(term_key) {
684                hashbrown::hash_map::Entry::Occupied(mut o) => {
685                    o.get_mut().add(doc_id, tf);
686                    self.estimated_memory += size_of::<CompactPosting>();
687                }
688                hashbrown::hash_map::Entry::Vacant(v) => {
689                    let mut posting = PostingListBuilder::new();
690                    posting.add(doc_id, tf);
691                    v.insert(posting);
692                    self.estimated_memory += size_of::<CompactPosting>() + NEW_TERM_OVERHEAD;
693                }
694            }
695
696            if position_mode.is_some()
697                && let Some(positions) = self.local_positions.get(&term_spur)
698            {
699                match self.position_index.entry(term_key) {
700                    hashbrown::hash_map::Entry::Occupied(mut o) => {
701                        for &pos in positions {
702                            o.get_mut().add_position(doc_id, pos);
703                        }
704                        self.estimated_memory += positions.len() * size_of::<u32>();
705                    }
706                    hashbrown::hash_map::Entry::Vacant(v) => {
707                        let mut pos_posting = PositionPostingListBuilder::new();
708                        for &pos in positions {
709                            pos_posting.add_position(doc_id, pos);
710                        }
711                        self.estimated_memory +=
712                            positions.len() * size_of::<u32>() + NEW_POS_TERM_OVERHEAD;
713                        v.insert(pos_posting);
714                    }
715                }
716            }
717        }
718
719        Ok(token_position)
720    }
721
722    fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
723        use std::fmt::Write;
724
725        self.numeric_buffer.clear();
726        write!(self.numeric_buffer, "__num_{}", value).unwrap();
727        let term_spur = if let Some(spur) = self.term_interner.get(&self.numeric_buffer) {
728            spur
729        } else {
730            let spur = self.term_interner.get_or_intern(&self.numeric_buffer);
731            self.estimated_memory += self.numeric_buffer.len() + INTERN_OVERHEAD;
732            spur
733        };
734
735        let term_key = TermKey {
736            field: field.0,
737            term: term_spur,
738        };
739
740        match self.inverted_index.entry(term_key) {
741            hashbrown::hash_map::Entry::Occupied(mut o) => {
742                o.get_mut().add(doc_id, 1);
743                self.estimated_memory += size_of::<CompactPosting>();
744            }
745            hashbrown::hash_map::Entry::Vacant(v) => {
746                let mut posting = PostingListBuilder::new();
747                posting.add(doc_id, 1);
748                v.insert(posting);
749                self.estimated_memory += size_of::<CompactPosting>() + NEW_TERM_OVERHEAD;
750            }
751        }
752
753        Ok(())
754    }
755
756    /// Index a dense vector field with ordinal tracking
757    fn index_dense_vector_field(
758        &mut self,
759        field: Field,
760        doc_id: DocId,
761        ordinal: u16,
762        vector: &[f32],
763    ) -> Result<()> {
764        let dim = vector.len();
765
766        let builder = self
767            .dense_vectors
768            .entry(field.0)
769            .or_insert_with(|| DenseVectorBuilder::new(dim));
770
771        // Verify dimension consistency
772        if builder.dim != dim && builder.len() > 0 {
773            return Err(crate::Error::Schema(format!(
774                "Dense vector dimension mismatch: expected {}, got {}",
775                builder.dim, dim
776            )));
777        }
778
779        builder.add(doc_id, ordinal, vector);
780
781        self.estimated_memory += std::mem::size_of_val(vector) + size_of::<(DocId, u16)>();
782
783        Ok(())
784    }
785
786    /// Index a binary dense vector field with ordinal tracking
787    fn index_binary_dense_vector_field(
788        &mut self,
789        field: Field,
790        doc_id: DocId,
791        ordinal: u16,
792        bytes: &[u8],
793    ) -> Result<()> {
794        let dim_bits = self
795            .schema
796            .get_field_entry(field)
797            .and_then(|e| e.binary_dense_vector_config.as_ref())
798            .map(|c| c.dim)
799            .ok_or_else(|| {
800                crate::Error::Schema("BinaryDenseVector field missing config".to_string())
801            })?;
802
803        let expected_byte_len = dim_bits.div_ceil(8);
804        if bytes.len() != expected_byte_len {
805            return Err(crate::Error::Schema(format!(
806                "Binary vector byte length mismatch: expected {} (dim={}), got {}",
807                expected_byte_len,
808                dim_bits,
809                bytes.len()
810            )));
811        }
812
813        let builder = self
814            .binary_dense_vectors
815            .entry(field.0)
816            .or_insert_with(|| BinaryDenseVectorBuilder::new(dim_bits));
817
818        builder.add(doc_id, ordinal, bytes);
819        self.estimated_memory += bytes.len() + size_of::<(DocId, u16)>();
820
821        Ok(())
822    }
823
824    /// Index a sparse vector field using dedicated sparse posting lists
825    ///
826    /// Collects (doc_id, ordinal, weight) postings per dimension. During commit, these are
827    /// converted to BlockSparsePostingList with proper quantization from SparseVectorConfig.
828    ///
829    /// Weights below the configured `weight_threshold` are not indexed.
830    fn index_sparse_vector_field(
831        &mut self,
832        field: Field,
833        doc_id: DocId,
834        ordinal: u16,
835        entries: &[(u32, f32)],
836    ) -> Result<()> {
837        // Get weight threshold from field config (default 0.0 = no filtering)
838        let weight_threshold = self
839            .schema
840            .get_field_entry(field)
841            .and_then(|entry| entry.sparse_vector_config.as_ref())
842            .map(|config| config.weight_threshold)
843            .unwrap_or(0.0);
844
845        let builder = self
846            .sparse_vectors
847            .entry(field.0)
848            .or_insert_with(SparseVectorBuilder::new);
849
850        builder.inc_vector_count();
851
852        for &(dim_id, weight) in entries {
853            // Skip weights below threshold
854            if weight.abs() < weight_threshold {
855                continue;
856            }
857
858            let is_new_dim = !builder.postings.contains_key(&dim_id);
859            builder.add(dim_id, doc_id, ordinal, weight);
860            self.estimated_memory += size_of::<(DocId, u16, f32)>();
861            if is_new_dim {
862                // HashMap entry overhead + Vec header
863                self.estimated_memory += size_of::<u32>() + size_of::<Vec<(DocId, u16, f32)>>() + 8; // 8 = hashmap control byte + padding
864            }
865        }
866
867        Ok(())
868    }
869
870    /// Write document to streaming store (reuses internal buffer to avoid per-doc allocation)
871    fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
872        use byteorder::{LittleEndian, WriteBytesExt};
873
874        super::store::serialize_document_into(doc, &self.schema, &mut self.doc_serialize_buffer)?;
875
876        #[cfg(feature = "native")]
877        {
878            self.store_file
879                .write_u32::<LittleEndian>(self.doc_serialize_buffer.len() as u32)?;
880            self.store_file.write_all(&self.doc_serialize_buffer)?;
881        }
882        #[cfg(not(feature = "native"))]
883        {
884            self.store_buffer
885                .write_u32::<LittleEndian>(self.doc_serialize_buffer.len() as u32)?;
886            self.store_buffer.write_all(&self.doc_serialize_buffer)?;
887        }
888
889        Ok(())
890    }
891
892    /// Build the final segment
893    ///
894    /// Streams all data directly to disk via StreamingWriter to avoid buffering
895    /// entire serialized outputs in memory. Each phase consumes and drops its
896    /// source data before the next phase begins.
897    pub async fn build<D: Directory + DirectoryWriter>(
898        mut self,
899        dir: &D,
900        segment_id: SegmentId,
901        trained: Option<&super::TrainedVectorStructures>,
902    ) -> Result<SegmentMeta> {
903        // Flush any buffered data
904        #[cfg(feature = "native")]
905        self.store_file.flush()?;
906
907        let files = SegmentFiles::new(segment_id.0);
908
909        // Phase 1: Stream positions directly to disk (consumes position_index)
910        let position_index = std::mem::take(&mut self.position_index);
911        let position_offsets = if !position_index.is_empty() {
912            let mut pos_writer = dir.streaming_writer(&files.positions).await?;
913            let offsets = postings::build_positions_streaming(
914                position_index,
915                &self.term_interner,
916                &mut *pos_writer,
917            )?;
918            pos_writer.finish()?;
919            offsets
920        } else {
921            FxHashMap::default()
922        };
923
924        // Phase 2: 4-way parallel build — postings, store, dense vectors, sparse vectors
925        // These are fully independent: different source data, different output files.
926        let inverted_index = std::mem::take(&mut self.inverted_index);
927        let term_interner = std::mem::replace(&mut self.term_interner, Rodeo::new());
928        #[cfg(feature = "native")]
929        let store_path = self.store_path.clone();
930        #[cfg(feature = "native")]
931        let num_compression_threads = self.config.num_compression_threads;
932        let compression_level = self.config.compression_level;
933        let dense_vectors = std::mem::take(&mut self.dense_vectors);
934        let binary_dense_vectors = std::mem::take(&mut self.binary_dense_vectors);
935        let mut sparse_vectors = std::mem::take(&mut self.sparse_vectors);
936        let schema = &self.schema;
937
938        // Pre-create all streaming writers (async) before entering sync rayon scope
939        // Wrapped in OffsetWriter to track bytes written per phase.
940        let mut term_dict_writer =
941            super::OffsetWriter::new(dir.streaming_writer(&files.term_dict).await?);
942        let mut postings_writer =
943            super::OffsetWriter::new(dir.streaming_writer(&files.postings).await?);
944        let mut store_writer = super::OffsetWriter::new(dir.streaming_writer(&files.store).await?);
945        let mut vectors_writer = if !dense_vectors.is_empty() || !binary_dense_vectors.is_empty() {
946            Some(super::OffsetWriter::new(
947                dir.streaming_writer(&files.vectors).await?,
948            ))
949        } else {
950            None
951        };
952        let mut sparse_writer = if !sparse_vectors.is_empty() {
953            Some(super::OffsetWriter::new(
954                dir.streaming_writer(&files.sparse).await?,
955            ))
956        } else {
957            None
958        };
959        let mut fast_fields = std::mem::take(&mut self.fast_fields);
960        let num_docs = self.next_doc_id;
961        let mut fast_writer = if !fast_fields.is_empty() {
962            Some(super::OffsetWriter::new(
963                dir.streaming_writer(&files.fast).await?,
964            ))
965        } else {
966            None
967        };
968
969        #[cfg(feature = "native")]
970        {
971            let ((postings_result, store_result), ((vectors_result, sparse_result), fast_result)) =
972                rayon::join(
973                    || {
974                        rayon::join(
975                            || {
976                                postings::build_postings_streaming(
977                                    inverted_index,
978                                    term_interner,
979                                    &position_offsets,
980                                    &mut term_dict_writer,
981                                    &mut postings_writer,
982                                )
983                            },
984                            || {
985                                store::build_store_streaming(
986                                    &store_path,
987                                    num_compression_threads,
988                                    compression_level,
989                                    &mut store_writer,
990                                    num_docs,
991                                )
992                            },
993                        )
994                    },
995                    || {
996                        rayon::join(
997                            || {
998                                rayon::join(
999                                    || -> Result<()> {
1000                                        if let Some(ref mut w) = vectors_writer {
1001                                            dense::build_vectors_streaming(
1002                                                dense_vectors,
1003                                                binary_dense_vectors,
1004                                                schema,
1005                                                trained,
1006                                                w,
1007                                            )?;
1008                                        }
1009                                        Ok(())
1010                                    },
1011                                    || -> Result<()> {
1012                                        if let Some(ref mut w) = sparse_writer {
1013                                            sparse::build_sparse_streaming(
1014                                                &mut sparse_vectors,
1015                                                schema,
1016                                                w,
1017                                            )?;
1018                                        }
1019                                        Ok(())
1020                                    },
1021                                )
1022                            },
1023                            || -> Result<()> {
1024                                if let Some(ref mut w) = fast_writer {
1025                                    build_fast_fields_streaming(&mut fast_fields, num_docs, w)?;
1026                                }
1027                                Ok(())
1028                            },
1029                        )
1030                    },
1031                );
1032            postings_result?;
1033            store_result?;
1034            vectors_result?;
1035            sparse_result?;
1036            fast_result?;
1037        }
1038
1039        #[cfg(not(feature = "native"))]
1040        {
1041            postings::build_postings_streaming(
1042                inverted_index,
1043                term_interner,
1044                &position_offsets,
1045                &mut term_dict_writer,
1046                &mut postings_writer,
1047            )?;
1048            store::build_store_streaming_from_buffer(
1049                &self.store_buffer,
1050                compression_level,
1051                &mut store_writer,
1052                num_docs,
1053            )?;
1054            if let Some(ref mut w) = vectors_writer {
1055                dense::build_vectors_streaming(
1056                    dense_vectors,
1057                    binary_dense_vectors,
1058                    schema,
1059                    trained,
1060                    w,
1061                )?;
1062            }
1063            if let Some(ref mut w) = sparse_writer {
1064                sparse::build_sparse_streaming(&mut sparse_vectors, schema, w)?;
1065            }
1066            if let Some(ref mut w) = fast_writer {
1067                build_fast_fields_streaming(&mut fast_fields, num_docs, w)?;
1068            }
1069        }
1070
1071        let term_dict_bytes = term_dict_writer.offset() as usize;
1072        let postings_bytes = postings_writer.offset() as usize;
1073        let store_bytes = store_writer.offset() as usize;
1074        let vectors_bytes = vectors_writer.as_ref().map_or(0, |w| w.offset() as usize);
1075        let sparse_bytes = sparse_writer.as_ref().map_or(0, |w| w.offset() as usize);
1076        let fast_bytes = fast_writer.as_ref().map_or(0, |w| w.offset() as usize);
1077
1078        term_dict_writer.finish()?;
1079        postings_writer.finish()?;
1080        store_writer.finish()?;
1081        if let Some(w) = vectors_writer {
1082            w.finish()?;
1083        }
1084        if let Some(w) = sparse_writer {
1085            w.finish()?;
1086        }
1087        if let Some(w) = fast_writer {
1088            w.finish()?;
1089        }
1090        drop(position_offsets);
1091        drop(sparse_vectors);
1092
1093        log::info!(
1094            "[segment_build] {} docs: term_dict={}, postings={}, store={}, vectors={}, sparse={}, fast={}",
1095            num_docs,
1096            super::format_bytes(term_dict_bytes),
1097            super::format_bytes(postings_bytes),
1098            super::format_bytes(store_bytes),
1099            super::format_bytes(vectors_bytes),
1100            super::format_bytes(sparse_bytes),
1101            super::format_bytes(fast_bytes),
1102        );
1103
1104        let meta = SegmentMeta {
1105            id: segment_id.0,
1106            num_docs: self.next_doc_id,
1107            field_stats: self.field_stats.clone(),
1108        };
1109
1110        dir.write(&files.meta, &meta.serialize()?).await?;
1111
1112        // Cleanup temp files
1113        #[cfg(feature = "native")]
1114        {
1115            let _ = std::fs::remove_file(&self.store_path);
1116        }
1117
1118        Ok(meta)
1119    }
1120}
1121
1122/// Serialize all fast-field columns to a `.fast` file.
1123fn build_fast_fields_streaming(
1124    fast_fields: &mut FxHashMap<u32, crate::structures::fast_field::FastFieldWriter>,
1125    num_docs: u32,
1126    writer: &mut dyn Write,
1127) -> Result<()> {
1128    use crate::structures::fast_field::{FastFieldTocEntry, write_fast_field_toc_and_footer};
1129
1130    if fast_fields.is_empty() {
1131        return Ok(());
1132    }
1133
1134    // Sort fields by id for deterministic output
1135    let mut field_ids: Vec<u32> = fast_fields.keys().copied().collect();
1136    field_ids.sort_unstable();
1137
1138    let mut toc_entries: Vec<FastFieldTocEntry> = Vec::with_capacity(field_ids.len());
1139    let mut current_offset = 0u64;
1140
1141    for &field_id in &field_ids {
1142        let ff = fast_fields.get_mut(&field_id).unwrap();
1143        ff.pad_to(num_docs);
1144
1145        let (mut toc, bytes_written) = ff.serialize(writer, current_offset)?;
1146        toc.field_id = field_id;
1147        current_offset += bytes_written;
1148        toc_entries.push(toc);
1149    }
1150
1151    // Write TOC + footer
1152    let toc_offset = current_offset;
1153    write_fast_field_toc_and_footer(writer, toc_offset, &toc_entries)?;
1154
1155    Ok(())
1156}
1157
1158#[cfg(feature = "native")]
1159impl Drop for SegmentBuilder {
1160    fn drop(&mut self) {
1161        // Cleanup temp files on drop
1162        let _ = std::fs::remove_file(&self.store_path);
1163    }
1164}