Skip to main content

hermes_core/segment/builder/
mod.rs

1//! Streaming segment builder with optimized memory usage
2//!
3//! Key optimizations:
4//! - **String interning**: Terms are interned using `lasso` to avoid repeated allocations
5//! - **hashbrown HashMap**: O(1) average insertion instead of BTreeMap's O(log n)
6//! - **Streaming document store**: Documents written to disk immediately
7//! - **Zero-copy store build**: Pre-serialized doc bytes passed directly to compressor
8//! - **Parallel posting serialization**: Rayon parallel sort + serialize
9//! - **Inline posting fast path**: Small terms skip PostingList/BlockPostingList entirely
10
11pub(crate) mod bmp;
12mod config;
13mod dense;
14#[cfg(feature = "diagnostics")]
15mod diagnostics;
16pub(crate) mod graph_bisection;
17mod postings;
18mod sparse;
19mod store;
20
21pub use config::{MemoryBreakdown, SegmentBuilderConfig, SegmentBuilderStats};
22
23#[cfg(feature = "native")]
24use std::fs::{File, OpenOptions};
25#[cfg(feature = "native")]
26use std::io::BufWriter;
27use std::io::Write;
28use std::mem::size_of;
29#[cfg(feature = "native")]
30use std::path::PathBuf;
31
32use hashbrown::HashMap;
33use rustc_hash::FxHashMap;
34
35// String interning: lasso on native (fast arena), HashMap on WASM (no C deps)
36#[cfg(feature = "native")]
37use lasso::{Rodeo, Spur};
38
39#[cfg(not(feature = "native"))]
40pub(crate) mod simple_interner {
41    use hashbrown::HashMap;
42
43    #[derive(Clone, Copy, PartialEq, Eq, Hash)]
44    pub struct Spur(u32);
45
46    /// Simple string interner for WASM (replaces lasso::Rodeo).
47    /// Stores each string once in a Vec; HashMap maps &str → index.
48    pub struct Rodeo {
49        /// Canonical storage — each string lives here exactly once.
50        strings: Vec<Box<str>>,
51        /// Maps borrowed string slices (pointing into `strings`) to their index.
52        /// Safety: entries are never removed and Box<str> has a stable address.
53        map: HashMap<&'static str, u32>,
54    }
55
56    impl Rodeo {
57        pub fn new() -> Self {
58            Self {
59                strings: Vec::new(),
60                map: HashMap::new(),
61            }
62        }
63
64        pub fn get(&self, key: &str) -> Option<Spur> {
65            self.map.get(key).map(|&id| Spur(id))
66        }
67
68        pub fn get_or_intern(&mut self, key: &str) -> Spur {
69            if let Some(&id) = self.map.get(key) {
70                return Spur(id);
71            }
72            let id = self.strings.len() as u32;
73            let boxed: Box<str> = key.into();
74            // Safety: the Box<str> is stored in self.strings (append-only Vec)
75            // and never moved or freed while the Rodeo is alive.
76            let static_ref: &'static str = unsafe { &*(boxed.as_ref() as *const str) };
77            self.strings.push(boxed);
78            self.map.insert(static_ref, id);
79            Spur(id)
80        }
81
82        pub fn resolve(&self, spur: &Spur) -> &str {
83            &self.strings[spur.0 as usize]
84        }
85
86        pub fn len(&self) -> usize {
87            self.strings.len()
88        }
89    }
90}
91
92#[cfg(not(feature = "native"))]
93use simple_interner::{Rodeo, Spur};
94
95use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
96use std::sync::Arc;
97
98use crate::directories::{Directory, DirectoryWriter};
99use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
100use crate::tokenizer::BoxedTokenizer;
101use crate::{DocId, Result};
102
103use dense::{BinaryDenseVectorBuilder, DenseVectorBuilder};
104use postings::{CompactPosting, PositionPostingListBuilder, PostingListBuilder, TermKey};
105use sparse::SparseVectorBuilder;
106
107/// Size of the document store buffer before writing to disk
108const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; // 16MB
109
110/// Memory overhead per new term in the inverted index:
111/// HashMap entry control byte + padding + TermKey + PostingListBuilder + Vec header
112const NEW_TERM_OVERHEAD: usize = size_of::<TermKey>() + size_of::<PostingListBuilder>() + 24;
113
114/// Memory overhead per newly interned string: Spur + arena pointers (2 × usize)
115const INTERN_OVERHEAD: usize = size_of::<Spur>() + 2 * size_of::<usize>();
116
117/// Memory overhead per new term in the position index
118const NEW_POS_TERM_OVERHEAD: usize =
119    size_of::<TermKey>() + size_of::<PositionPostingListBuilder>() + 24;
120
121/// Segment builder with optimized memory usage
122///
123/// Features:
124/// - Streams documents to disk immediately (no in-memory document storage)
125/// - Uses string interning for terms (reduced allocations)
126/// - Uses hashbrown HashMap (faster than BTreeMap)
127pub struct SegmentBuilder {
128    schema: Arc<Schema>,
129    config: SegmentBuilderConfig,
130    tokenizers: FxHashMap<Field, BoxedTokenizer>,
131
132    /// String interner for terms - O(1) lookup and deduplication
133    term_interner: Rodeo,
134
135    /// Inverted index: term key -> posting list
136    inverted_index: HashMap<TermKey, PostingListBuilder>,
137
138    /// Streaming document store writer (native: temp file on disk, WASM: in-memory buffer)
139    #[cfg(feature = "native")]
140    store_file: BufWriter<File>,
141    #[cfg(feature = "native")]
142    store_path: PathBuf,
143    #[cfg(not(feature = "native"))]
144    store_buffer: Vec<u8>,
145
146    /// Document count
147    next_doc_id: DocId,
148
149    /// Per-field statistics for BM25F
150    field_stats: FxHashMap<u32, FieldStats>,
151
152    /// Per-document field lengths stored compactly
153    /// Uses a flat Vec instead of Vec<HashMap> for better cache locality
154    /// Layout: [doc0_field0_len, doc0_field1_len, ..., doc1_field0_len, ...]
155    doc_field_lengths: Vec<u32>,
156    num_indexed_fields: usize,
157    field_to_slot: FxHashMap<u32, usize>,
158
159    /// Reusable buffer for per-document term frequency aggregation
160    /// Avoids allocating a new hashmap for each document
161    local_tf_buffer: FxHashMap<Spur, u32>,
162
163    /// Reusable buffer for per-document position tracking (when positions enabled)
164    /// Avoids allocating a new hashmap for each text field per document
165    local_positions: FxHashMap<Spur, Vec<u32>>,
166
167    /// Reusable buffer for tokenization to avoid per-token String allocations
168    token_buffer: String,
169
170    /// Reusable buffer for numeric field term encoding (avoids format!() alloc per call)
171    numeric_buffer: String,
172
173    /// Dense vector storage per field: field -> (doc_ids, vectors)
174    /// Vectors are stored as flat f32 arrays for efficient RaBitQ indexing
175    dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
176
177    /// Binary dense vector storage per field: field -> packed-bit vectors
178    binary_dense_vectors: FxHashMap<u32, BinaryDenseVectorBuilder>,
179
180    /// Sparse vector storage per field: field -> SparseVectorBuilder
181    /// Uses proper BlockSparsePostingList with configurable quantization
182    sparse_vectors: FxHashMap<u32, SparseVectorBuilder>,
183
184    /// Position index for fields with positions enabled
185    /// term key -> position posting list
186    position_index: HashMap<TermKey, PositionPostingListBuilder>,
187
188    /// Fields that have position tracking enabled, with their mode
189    position_enabled_fields: FxHashMap<u32, Option<crate::dsl::PositionMode>>,
190
191    /// Current element ordinal for multi-valued fields (reset per document)
192    current_element_ordinal: FxHashMap<u32, u32>,
193
194    /// Incrementally tracked memory estimate (avoids expensive stats() calls)
195    estimated_memory: usize,
196
197    /// Reusable buffer for document serialization (avoids per-document allocation)
198    doc_serialize_buffer: Vec<u8>,
199
200    /// Fast-field columnar writers per field_id (only for fields with fast=true)
201    fast_fields: FxHashMap<u32, crate::structures::fast_field::FastFieldWriter>,
202}
203
204impl SegmentBuilder {
205    /// Create a new segment builder
206    pub fn new(schema: Arc<Schema>, config: SegmentBuilderConfig) -> Result<Self> {
207        #[cfg(feature = "native")]
208        let (store_file, store_path) = {
209            let segment_id = uuid::Uuid::new_v4();
210            let store_path = config
211                .temp_dir
212                .join(format!("hermes_store_{}.tmp", segment_id));
213            let store_file = BufWriter::with_capacity(
214                STORE_BUFFER_SIZE,
215                OpenOptions::new()
216                    .create(true)
217                    .write(true)
218                    .truncate(true)
219                    .open(&store_path)?,
220            );
221            (store_file, store_path)
222        };
223
224        // Count indexed fields, track positions, and auto-configure tokenizers
225        let registry = crate::tokenizer::TokenizerRegistry::new();
226        let mut num_indexed_fields = 0;
227        let mut field_to_slot = FxHashMap::default();
228        let mut position_enabled_fields = FxHashMap::default();
229        let mut tokenizers = FxHashMap::default();
230        for (field, entry) in schema.fields() {
231            if entry.indexed && matches!(entry.field_type, FieldType::Text) {
232                field_to_slot.insert(field.0, num_indexed_fields);
233                num_indexed_fields += 1;
234                if entry.positions.is_some() {
235                    position_enabled_fields.insert(field.0, entry.positions);
236                }
237                if let Some(ref tok_name) = entry.tokenizer
238                    && let Some(tokenizer) = registry.get(tok_name)
239                {
240                    tokenizers.insert(field, tokenizer);
241                }
242            }
243        }
244
245        // Initialize fast-field writers for fields with fast=true
246        use crate::structures::fast_field::{FastFieldColumnType, FastFieldWriter};
247        let mut fast_fields = FxHashMap::default();
248        for (field, entry) in schema.fields() {
249            if entry.fast {
250                let writer = if entry.multi {
251                    match entry.field_type {
252                        FieldType::U64 => {
253                            FastFieldWriter::new_numeric_multi(FastFieldColumnType::U64)
254                        }
255                        FieldType::I64 => {
256                            FastFieldWriter::new_numeric_multi(FastFieldColumnType::I64)
257                        }
258                        FieldType::F64 => {
259                            FastFieldWriter::new_numeric_multi(FastFieldColumnType::F64)
260                        }
261                        FieldType::Text => FastFieldWriter::new_text_multi(),
262                        _ => continue,
263                    }
264                } else {
265                    match entry.field_type {
266                        FieldType::U64 => FastFieldWriter::new_numeric(FastFieldColumnType::U64),
267                        FieldType::I64 => FastFieldWriter::new_numeric(FastFieldColumnType::I64),
268                        FieldType::F64 => FastFieldWriter::new_numeric(FastFieldColumnType::F64),
269                        FieldType::Text => FastFieldWriter::new_text(),
270                        _ => continue,
271                    }
272                };
273                fast_fields.insert(field.0, writer);
274            }
275        }
276
277        Ok(Self {
278            schema,
279            tokenizers,
280            term_interner: Rodeo::new(),
281            inverted_index: HashMap::with_capacity(config.posting_map_capacity),
282            #[cfg(feature = "native")]
283            store_file,
284            #[cfg(feature = "native")]
285            store_path,
286            #[cfg(not(feature = "native"))]
287            store_buffer: Vec::with_capacity(STORE_BUFFER_SIZE),
288            next_doc_id: 0,
289            field_stats: FxHashMap::default(),
290            doc_field_lengths: Vec::new(),
291            num_indexed_fields,
292            field_to_slot,
293            local_tf_buffer: FxHashMap::default(),
294            local_positions: FxHashMap::default(),
295            token_buffer: String::with_capacity(64),
296            numeric_buffer: String::with_capacity(32),
297            config,
298            dense_vectors: FxHashMap::default(),
299            binary_dense_vectors: FxHashMap::default(),
300            sparse_vectors: FxHashMap::default(),
301            position_index: HashMap::new(),
302            position_enabled_fields,
303            current_element_ordinal: FxHashMap::default(),
304            estimated_memory: 0,
305            doc_serialize_buffer: Vec::with_capacity(256),
306            fast_fields,
307        })
308    }
309
310    pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
311        self.tokenizers.insert(field, tokenizer);
312    }
313
314    /// Get the current element ordinal for a field and increment it.
315    /// Used for multi-valued fields (text, dense_vector, sparse_vector).
316    fn next_element_ordinal(&mut self, field_id: u32) -> u32 {
317        let ordinal = *self.current_element_ordinal.get(&field_id).unwrap_or(&0);
318        *self.current_element_ordinal.entry(field_id).or_insert(0) += 1;
319        ordinal
320    }
321
322    pub fn num_docs(&self) -> u32 {
323        self.next_doc_id
324    }
325
326    /// Fast O(1) memory estimate - updated incrementally during indexing
327    #[inline]
328    pub fn estimated_memory_bytes(&self) -> usize {
329        self.estimated_memory
330    }
331
332    /// Count total unique sparse dimensions across all fields
333    pub fn sparse_dim_count(&self) -> usize {
334        self.sparse_vectors.values().map(|b| b.postings.len()).sum()
335    }
336
337    /// Get current statistics for debugging performance (expensive - iterates all data)
338    pub fn stats(&self) -> SegmentBuilderStats {
339        use std::mem::size_of;
340
341        let postings_in_memory: usize =
342            self.inverted_index.values().map(|p| p.postings.len()).sum();
343
344        // Size constants computed from actual types
345        let compact_posting_size = size_of::<CompactPosting>();
346        let vec_overhead = size_of::<Vec<u8>>(); // Vec header: ptr + len + cap = 24 bytes on 64-bit
347        let term_key_size = size_of::<TermKey>();
348        let posting_builder_size = size_of::<PostingListBuilder>();
349        let spur_size = size_of::<Spur>();
350        let sparse_entry_size = size_of::<(DocId, u16, f32)>();
351
352        // hashbrown HashMap entry overhead: key + value + 1 byte control + padding
353        // Measured: ~(key_size + value_size + 8) per entry on average
354        let hashmap_entry_base_overhead = 8usize;
355
356        // FxHashMap uses same layout as hashbrown
357        let fxhashmap_entry_overhead = hashmap_entry_base_overhead;
358
359        // Postings memory
360        let postings_bytes: usize = self
361            .inverted_index
362            .values()
363            .map(|p| p.postings.capacity() * compact_posting_size + vec_overhead)
364            .sum();
365
366        // Inverted index overhead
367        let index_overhead_bytes = self.inverted_index.len()
368            * (term_key_size + posting_builder_size + hashmap_entry_base_overhead);
369
370        // Term interner: Rodeo stores strings + metadata
371        // Rodeo internal: string bytes + Spur + arena overhead (~2 pointers per string)
372        let interner_arena_overhead = 2 * size_of::<usize>();
373        let avg_term_len = 8; // Estimated average term length
374        let interner_bytes =
375            self.term_interner.len() * (avg_term_len + spur_size + interner_arena_overhead);
376
377        // Doc field lengths
378        let field_lengths_bytes =
379            self.doc_field_lengths.capacity() * size_of::<u32>() + vec_overhead;
380
381        // Dense vectors
382        let mut dense_vectors_bytes: usize = 0;
383        let mut dense_vector_count: usize = 0;
384        let doc_id_ordinal_size = size_of::<(DocId, u16)>();
385        for b in self.dense_vectors.values() {
386            dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
387                + b.doc_ids.capacity() * doc_id_ordinal_size
388                + 2 * vec_overhead; // Two Vecs
389            dense_vector_count += b.doc_ids.len();
390        }
391        // Binary dense vectors
392        for b in self.binary_dense_vectors.values() {
393            dense_vectors_bytes += b.vectors.capacity()
394                + b.doc_ids.capacity() * doc_id_ordinal_size
395                + 2 * vec_overhead;
396            dense_vector_count += b.doc_ids.len();
397        }
398
399        // Local buffers
400        let local_tf_entry_size = spur_size + size_of::<u32>() + fxhashmap_entry_overhead;
401        let local_tf_buffer_bytes = self.local_tf_buffer.capacity() * local_tf_entry_size;
402
403        // Sparse vectors
404        let mut sparse_vectors_bytes: usize = 0;
405        for builder in self.sparse_vectors.values() {
406            for postings in builder.postings.values() {
407                sparse_vectors_bytes += postings.capacity() * sparse_entry_size + vec_overhead;
408            }
409            // Inner FxHashMap overhead: u32 key + Vec value ptr + overhead
410            let inner_entry_size = size_of::<u32>() + vec_overhead + fxhashmap_entry_overhead;
411            sparse_vectors_bytes += builder.postings.len() * inner_entry_size;
412        }
413        // Outer FxHashMap overhead
414        let outer_sparse_entry_size =
415            size_of::<u32>() + size_of::<SparseVectorBuilder>() + fxhashmap_entry_overhead;
416        sparse_vectors_bytes += self.sparse_vectors.len() * outer_sparse_entry_size;
417
418        // Position index
419        let mut position_index_bytes: usize = 0;
420        for pos_builder in self.position_index.values() {
421            for (_, positions) in &pos_builder.postings {
422                position_index_bytes += positions.capacity() * size_of::<u32>() + vec_overhead;
423            }
424            // Vec<(DocId, Vec<u32>)> entry size
425            let pos_entry_size = size_of::<DocId>() + vec_overhead;
426            position_index_bytes += pos_builder.postings.capacity() * pos_entry_size;
427        }
428        // HashMap overhead for position_index
429        let pos_index_entry_size =
430            term_key_size + size_of::<PositionPostingListBuilder>() + hashmap_entry_base_overhead;
431        position_index_bytes += self.position_index.len() * pos_index_entry_size;
432
433        let estimated_memory_bytes = postings_bytes
434            + index_overhead_bytes
435            + interner_bytes
436            + field_lengths_bytes
437            + dense_vectors_bytes
438            + local_tf_buffer_bytes
439            + sparse_vectors_bytes
440            + position_index_bytes;
441
442        let memory_breakdown = MemoryBreakdown {
443            postings_bytes,
444            index_overhead_bytes,
445            interner_bytes,
446            field_lengths_bytes,
447            dense_vectors_bytes,
448            dense_vector_count,
449            sparse_vectors_bytes,
450            position_index_bytes,
451        };
452
453        SegmentBuilderStats {
454            num_docs: self.next_doc_id,
455            unique_terms: self.inverted_index.len(),
456            postings_in_memory,
457            interned_strings: self.term_interner.len(),
458            doc_field_lengths_size: self.doc_field_lengths.len(),
459            estimated_memory_bytes,
460            memory_breakdown,
461        }
462    }
463
464    /// Add a document - streams to disk immediately
465    pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
466        let doc_id = self.next_doc_id;
467        self.next_doc_id += 1;
468
469        // Initialize field lengths for this document
470        let base_idx = self.doc_field_lengths.len();
471        self.doc_field_lengths
472            .resize(base_idx + self.num_indexed_fields, 0);
473        self.estimated_memory += self.num_indexed_fields * std::mem::size_of::<u32>();
474
475        // Reset element ordinals for this document (for multi-valued fields)
476        self.current_element_ordinal.clear();
477
478        for (field, value) in doc.field_values() {
479            let Some(entry) = self.schema.get_field_entry(*field) else {
480                continue;
481            };
482
483            // Dense/binary vectors are written to .vectors when indexed || stored
484            // Other field types require indexed or fast
485            if !matches!(
486                &entry.field_type,
487                FieldType::DenseVector | FieldType::BinaryDenseVector
488            ) && !entry.indexed
489                && !entry.fast
490            {
491                continue;
492            }
493
494            match (&entry.field_type, value) {
495                (FieldType::Text, FieldValue::Text(text)) => {
496                    if entry.indexed {
497                        let element_ordinal = self.next_element_ordinal(field.0);
498                        let token_count =
499                            self.index_text_field(*field, doc_id, text, element_ordinal)?;
500
501                        let stats = self.field_stats.entry(field.0).or_default();
502                        stats.total_tokens += token_count as u64;
503                        if element_ordinal == 0 {
504                            stats.doc_count += 1;
505                        }
506
507                        if let Some(&slot) = self.field_to_slot.get(&field.0) {
508                            self.doc_field_lengths[base_idx + slot] = token_count;
509                        }
510                    }
511
512                    // Fast-field: store raw text for text ordinal column
513                    if let Some(ff) = self.fast_fields.get_mut(&field.0) {
514                        ff.add_text(doc_id, text);
515                    }
516                }
517                (FieldType::U64, FieldValue::U64(v)) => {
518                    if entry.indexed {
519                        self.index_numeric_field(*field, doc_id, *v)?;
520                    }
521                    if let Some(ff) = self.fast_fields.get_mut(&field.0) {
522                        ff.add_u64(doc_id, *v);
523                    }
524                }
525                (FieldType::I64, FieldValue::I64(v)) => {
526                    if entry.indexed {
527                        self.index_numeric_field(*field, doc_id, *v as u64)?;
528                    }
529                    if let Some(ff) = self.fast_fields.get_mut(&field.0) {
530                        ff.add_i64(doc_id, *v);
531                    }
532                }
533                (FieldType::F64, FieldValue::F64(v)) => {
534                    if entry.indexed {
535                        self.index_numeric_field(*field, doc_id, v.to_bits())?;
536                    }
537                    if let Some(ff) = self.fast_fields.get_mut(&field.0) {
538                        ff.add_f64(doc_id, *v);
539                    }
540                }
541                (FieldType::DenseVector, FieldValue::DenseVector(vec))
542                    if entry.indexed || entry.stored =>
543                {
544                    let ordinal = self.next_element_ordinal(field.0);
545                    self.index_dense_vector_field(*field, doc_id, ordinal as u16, vec)?;
546                }
547                (FieldType::BinaryDenseVector, FieldValue::BinaryDenseVector(bytes))
548                    if entry.indexed || entry.stored =>
549                {
550                    let ordinal = self.next_element_ordinal(field.0);
551                    self.index_binary_dense_vector_field(*field, doc_id, ordinal as u16, bytes)?;
552                }
553                (FieldType::SparseVector, FieldValue::SparseVector(entries)) => {
554                    let ordinal = self.next_element_ordinal(field.0);
555                    self.index_sparse_vector_field(*field, doc_id, ordinal as u16, entries)?;
556                }
557                _ => {}
558            }
559        }
560
561        // Stream document to disk immediately
562        self.write_document_to_store(&doc)?;
563
564        Ok(doc_id)
565    }
566
567    /// Index a text field using interned terms
568    ///
569    /// Uses a custom tokenizer when set for the field (via `set_tokenizer`),
570    /// otherwise falls back to an inline zero-allocation path (split_whitespace
571    /// + lowercase + strip non-alphanumeric).
572    ///
573    /// If position recording is enabled for this field, also records token positions
574    /// encoded as (element_ordinal << 20) | token_position.
575    fn index_text_field(
576        &mut self,
577        field: Field,
578        doc_id: DocId,
579        text: &str,
580        element_ordinal: u32,
581    ) -> Result<u32> {
582        use crate::dsl::PositionMode;
583
584        let field_id = field.0;
585        let position_mode = self
586            .position_enabled_fields
587            .get(&field_id)
588            .copied()
589            .flatten();
590
591        // Phase 1: Aggregate term frequencies within this document
592        // Also collect positions if enabled
593        // Reuse buffers to avoid allocations
594        self.local_tf_buffer.clear();
595        // Clear position Vecs in-place (keeps allocated capacity for reuse)
596        for v in self.local_positions.values_mut() {
597            v.clear();
598        }
599
600        let mut token_position = 0u32;
601
602        // Tokenize: use custom tokenizer if set, else inline zero-alloc path.
603        // The owned Vec<Token> is computed first so the immutable borrow of
604        // self.tokenizers ends before we mutate other fields.
605        let custom_tokens = self.tokenizers.get(&field).map(|t| t.tokenize(text));
606
607        if let Some(tokens) = custom_tokens {
608            // Custom tokenizer path
609            for token in &tokens {
610                let term_spur = if let Some(spur) = self.term_interner.get(&token.text) {
611                    spur
612                } else {
613                    let spur = self.term_interner.get_or_intern(&token.text);
614                    self.estimated_memory += token.text.len() + INTERN_OVERHEAD;
615                    spur
616                };
617                *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
618
619                if let Some(mode) = position_mode {
620                    let encoded_pos = match mode {
621                        PositionMode::Ordinal => element_ordinal << 20,
622                        PositionMode::TokenPosition => token.position,
623                        PositionMode::Full => (element_ordinal << 20) | token.position,
624                    };
625                    self.local_positions
626                        .entry(term_spur)
627                        .or_default()
628                        .push(encoded_pos);
629                }
630            }
631            token_position = tokens.len() as u32;
632        } else {
633            // Inline zero-allocation path: split_whitespace + lowercase + strip non-alphanumeric
634            for word in text.split_whitespace() {
635                self.token_buffer.clear();
636                for c in word.chars() {
637                    if c.is_alphanumeric() {
638                        for lc in c.to_lowercase() {
639                            self.token_buffer.push(lc);
640                        }
641                    }
642                }
643
644                if self.token_buffer.is_empty() {
645                    continue;
646                }
647
648                let term_spur = if let Some(spur) = self.term_interner.get(&self.token_buffer) {
649                    spur
650                } else {
651                    let spur = self.term_interner.get_or_intern(&self.token_buffer);
652                    self.estimated_memory += self.token_buffer.len() + INTERN_OVERHEAD;
653                    spur
654                };
655                *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
656
657                if let Some(mode) = position_mode {
658                    let encoded_pos = match mode {
659                        PositionMode::Ordinal => element_ordinal << 20,
660                        PositionMode::TokenPosition => token_position,
661                        PositionMode::Full => (element_ordinal << 20) | token_position,
662                    };
663                    self.local_positions
664                        .entry(term_spur)
665                        .or_default()
666                        .push(encoded_pos);
667                }
668
669                token_position += 1;
670            }
671        }
672
673        // Phase 2: Insert aggregated terms into inverted index
674        // Now we only do one inverted_index lookup per unique term in doc
675        for (&term_spur, &tf) in &self.local_tf_buffer {
676            let term_key = TermKey {
677                field: field_id,
678                term: term_spur,
679            };
680
681            match self.inverted_index.entry(term_key) {
682                hashbrown::hash_map::Entry::Occupied(mut o) => {
683                    o.get_mut().add(doc_id, tf);
684                    self.estimated_memory += size_of::<CompactPosting>();
685                }
686                hashbrown::hash_map::Entry::Vacant(v) => {
687                    let mut posting = PostingListBuilder::new();
688                    posting.add(doc_id, tf);
689                    v.insert(posting);
690                    self.estimated_memory += size_of::<CompactPosting>() + NEW_TERM_OVERHEAD;
691                }
692            }
693
694            if position_mode.is_some()
695                && let Some(positions) = self.local_positions.get(&term_spur)
696            {
697                match self.position_index.entry(term_key) {
698                    hashbrown::hash_map::Entry::Occupied(mut o) => {
699                        for &pos in positions {
700                            o.get_mut().add_position(doc_id, pos);
701                        }
702                        self.estimated_memory += positions.len() * size_of::<u32>();
703                    }
704                    hashbrown::hash_map::Entry::Vacant(v) => {
705                        let mut pos_posting = PositionPostingListBuilder::new();
706                        for &pos in positions {
707                            pos_posting.add_position(doc_id, pos);
708                        }
709                        self.estimated_memory +=
710                            positions.len() * size_of::<u32>() + NEW_POS_TERM_OVERHEAD;
711                        v.insert(pos_posting);
712                    }
713                }
714            }
715        }
716
717        Ok(token_position)
718    }
719
720    fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
721        use std::fmt::Write;
722
723        self.numeric_buffer.clear();
724        write!(self.numeric_buffer, "__num_{}", value).unwrap();
725        let term_spur = if let Some(spur) = self.term_interner.get(&self.numeric_buffer) {
726            spur
727        } else {
728            let spur = self.term_interner.get_or_intern(&self.numeric_buffer);
729            self.estimated_memory += self.numeric_buffer.len() + INTERN_OVERHEAD;
730            spur
731        };
732
733        let term_key = TermKey {
734            field: field.0,
735            term: term_spur,
736        };
737
738        match self.inverted_index.entry(term_key) {
739            hashbrown::hash_map::Entry::Occupied(mut o) => {
740                o.get_mut().add(doc_id, 1);
741                self.estimated_memory += size_of::<CompactPosting>();
742            }
743            hashbrown::hash_map::Entry::Vacant(v) => {
744                let mut posting = PostingListBuilder::new();
745                posting.add(doc_id, 1);
746                v.insert(posting);
747                self.estimated_memory += size_of::<CompactPosting>() + NEW_TERM_OVERHEAD;
748            }
749        }
750
751        Ok(())
752    }
753
754    /// Index a dense vector field with ordinal tracking
755    fn index_dense_vector_field(
756        &mut self,
757        field: Field,
758        doc_id: DocId,
759        ordinal: u16,
760        vector: &[f32],
761    ) -> Result<()> {
762        let dim = vector.len();
763
764        let builder = self
765            .dense_vectors
766            .entry(field.0)
767            .or_insert_with(|| DenseVectorBuilder::new(dim));
768
769        // Verify dimension consistency
770        if builder.dim != dim && builder.len() > 0 {
771            return Err(crate::Error::Schema(format!(
772                "Dense vector dimension mismatch: expected {}, got {}",
773                builder.dim, dim
774            )));
775        }
776
777        builder.add(doc_id, ordinal, vector);
778
779        self.estimated_memory += std::mem::size_of_val(vector) + size_of::<(DocId, u16)>();
780
781        Ok(())
782    }
783
784    /// Index a binary dense vector field with ordinal tracking
785    fn index_binary_dense_vector_field(
786        &mut self,
787        field: Field,
788        doc_id: DocId,
789        ordinal: u16,
790        bytes: &[u8],
791    ) -> Result<()> {
792        let dim_bits = self
793            .schema
794            .get_field_entry(field)
795            .and_then(|e| e.binary_dense_vector_config.as_ref())
796            .map(|c| c.dim)
797            .ok_or_else(|| {
798                crate::Error::Schema("BinaryDenseVector field missing config".to_string())
799            })?;
800
801        let expected_byte_len = dim_bits.div_ceil(8);
802        if bytes.len() != expected_byte_len {
803            return Err(crate::Error::Schema(format!(
804                "Binary vector byte length mismatch: expected {} (dim={}), got {}",
805                expected_byte_len,
806                dim_bits,
807                bytes.len()
808            )));
809        }
810
811        let builder = self
812            .binary_dense_vectors
813            .entry(field.0)
814            .or_insert_with(|| BinaryDenseVectorBuilder::new(dim_bits));
815
816        builder.add(doc_id, ordinal, bytes);
817        self.estimated_memory += bytes.len() + size_of::<(DocId, u16)>();
818
819        Ok(())
820    }
821
822    /// Index a sparse vector field using dedicated sparse posting lists
823    ///
824    /// Collects (doc_id, ordinal, weight) postings per dimension. During commit, these are
825    /// converted to BlockSparsePostingList with proper quantization from SparseVectorConfig.
826    ///
827    /// Weights below the configured `weight_threshold` are not indexed.
828    fn index_sparse_vector_field(
829        &mut self,
830        field: Field,
831        doc_id: DocId,
832        ordinal: u16,
833        entries: &[(u32, f32)],
834    ) -> Result<()> {
835        // Get weight threshold from field config (default 0.0 = no filtering)
836        let weight_threshold = self
837            .schema
838            .get_field_entry(field)
839            .and_then(|entry| entry.sparse_vector_config.as_ref())
840            .map(|config| config.weight_threshold)
841            .unwrap_or(0.0);
842
843        let builder = self
844            .sparse_vectors
845            .entry(field.0)
846            .or_insert_with(SparseVectorBuilder::new);
847
848        builder.inc_vector_count();
849
850        for &(dim_id, weight) in entries {
851            // Skip weights below threshold
852            if weight.abs() < weight_threshold {
853                continue;
854            }
855
856            let is_new_dim = !builder.postings.contains_key(&dim_id);
857            builder.add(dim_id, doc_id, ordinal, weight);
858            self.estimated_memory += size_of::<(DocId, u16, f32)>();
859            if is_new_dim {
860                // HashMap entry overhead + Vec header
861                self.estimated_memory += size_of::<u32>() + size_of::<Vec<(DocId, u16, f32)>>() + 8; // 8 = hashmap control byte + padding
862            }
863        }
864
865        Ok(())
866    }
867
868    /// Write document to streaming store (reuses internal buffer to avoid per-doc allocation)
869    fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
870        use byteorder::{LittleEndian, WriteBytesExt};
871
872        super::store::serialize_document_into(doc, &self.schema, &mut self.doc_serialize_buffer)?;
873
874        #[cfg(feature = "native")]
875        {
876            self.store_file
877                .write_u32::<LittleEndian>(self.doc_serialize_buffer.len() as u32)?;
878            self.store_file.write_all(&self.doc_serialize_buffer)?;
879        }
880        #[cfg(not(feature = "native"))]
881        {
882            self.store_buffer
883                .write_u32::<LittleEndian>(self.doc_serialize_buffer.len() as u32)?;
884            self.store_buffer.write_all(&self.doc_serialize_buffer)?;
885        }
886
887        Ok(())
888    }
889
890    /// Build the final segment
891    ///
892    /// Streams all data directly to disk via StreamingWriter to avoid buffering
893    /// entire serialized outputs in memory. Each phase consumes and drops its
894    /// source data before the next phase begins.
895    pub async fn build<D: Directory + DirectoryWriter>(
896        mut self,
897        dir: &D,
898        segment_id: SegmentId,
899        trained: Option<&super::TrainedVectorStructures>,
900    ) -> Result<SegmentMeta> {
901        // Flush any buffered data
902        #[cfg(feature = "native")]
903        self.store_file.flush()?;
904
905        let files = SegmentFiles::new(segment_id.0);
906
907        // Phase 1: Stream positions directly to disk (consumes position_index)
908        let position_index = std::mem::take(&mut self.position_index);
909        let position_offsets = if !position_index.is_empty() {
910            let mut pos_writer = dir.streaming_writer(&files.positions).await?;
911            let offsets = postings::build_positions_streaming(
912                position_index,
913                &self.term_interner,
914                &mut *pos_writer,
915            )?;
916            pos_writer.finish()?;
917            offsets
918        } else {
919            FxHashMap::default()
920        };
921
922        // Phase 2: 4-way parallel build — postings, store, dense vectors, sparse vectors
923        // These are fully independent: different source data, different output files.
924        let inverted_index = std::mem::take(&mut self.inverted_index);
925        let term_interner = std::mem::replace(&mut self.term_interner, Rodeo::new());
926        #[cfg(feature = "native")]
927        let store_path = self.store_path.clone();
928        #[cfg(feature = "native")]
929        let num_compression_threads = self.config.num_compression_threads;
930        let compression_level = self.config.compression_level;
931        let dense_vectors = std::mem::take(&mut self.dense_vectors);
932        let binary_dense_vectors = std::mem::take(&mut self.binary_dense_vectors);
933        let mut sparse_vectors = std::mem::take(&mut self.sparse_vectors);
934        let schema = &self.schema;
935
936        // Pre-create all streaming writers (async) before entering sync rayon scope
937        // Wrapped in OffsetWriter to track bytes written per phase.
938        let mut term_dict_writer =
939            super::OffsetWriter::new(dir.streaming_writer(&files.term_dict).await?);
940        let mut postings_writer =
941            super::OffsetWriter::new(dir.streaming_writer(&files.postings).await?);
942        let mut store_writer = super::OffsetWriter::new(dir.streaming_writer(&files.store).await?);
943        let mut vectors_writer = if !dense_vectors.is_empty() || !binary_dense_vectors.is_empty() {
944            Some(super::OffsetWriter::new(
945                dir.streaming_writer(&files.vectors).await?,
946            ))
947        } else {
948            None
949        };
950        let mut sparse_writer = if !sparse_vectors.is_empty() {
951            Some(super::OffsetWriter::new(
952                dir.streaming_writer(&files.sparse).await?,
953            ))
954        } else {
955            None
956        };
957        let mut fast_fields = std::mem::take(&mut self.fast_fields);
958        let num_docs = self.next_doc_id;
959        let mut fast_writer = if !fast_fields.is_empty() {
960            Some(super::OffsetWriter::new(
961                dir.streaming_writer(&files.fast).await?,
962            ))
963        } else {
964            None
965        };
966
967        #[cfg(feature = "native")]
968        {
969            let ((postings_result, store_result), ((vectors_result, sparse_result), fast_result)) =
970                rayon::join(
971                    || {
972                        rayon::join(
973                            || {
974                                postings::build_postings_streaming(
975                                    inverted_index,
976                                    term_interner,
977                                    &position_offsets,
978                                    &mut term_dict_writer,
979                                    &mut postings_writer,
980                                )
981                            },
982                            || {
983                                store::build_store_streaming(
984                                    &store_path,
985                                    num_compression_threads,
986                                    compression_level,
987                                    &mut store_writer,
988                                    num_docs,
989                                )
990                            },
991                        )
992                    },
993                    || {
994                        rayon::join(
995                            || {
996                                rayon::join(
997                                    || -> Result<()> {
998                                        if let Some(ref mut w) = vectors_writer {
999                                            dense::build_vectors_streaming(
1000                                                dense_vectors,
1001                                                binary_dense_vectors,
1002                                                schema,
1003                                                trained,
1004                                                w,
1005                                            )?;
1006                                        }
1007                                        Ok(())
1008                                    },
1009                                    || -> Result<()> {
1010                                        if let Some(ref mut w) = sparse_writer {
1011                                            sparse::build_sparse_streaming(
1012                                                &mut sparse_vectors,
1013                                                schema,
1014                                                w,
1015                                            )?;
1016                                        }
1017                                        Ok(())
1018                                    },
1019                                )
1020                            },
1021                            || -> Result<()> {
1022                                if let Some(ref mut w) = fast_writer {
1023                                    build_fast_fields_streaming(&mut fast_fields, num_docs, w)?;
1024                                }
1025                                Ok(())
1026                            },
1027                        )
1028                    },
1029                );
1030            postings_result?;
1031            store_result?;
1032            vectors_result?;
1033            sparse_result?;
1034            fast_result?;
1035        }
1036
1037        #[cfg(not(feature = "native"))]
1038        {
1039            postings::build_postings_streaming(
1040                inverted_index,
1041                term_interner,
1042                &position_offsets,
1043                &mut term_dict_writer,
1044                &mut postings_writer,
1045            )?;
1046            store::build_store_streaming_from_buffer(
1047                &self.store_buffer,
1048                compression_level,
1049                &mut store_writer,
1050                num_docs,
1051            )?;
1052            if let Some(ref mut w) = vectors_writer {
1053                dense::build_vectors_streaming(
1054                    dense_vectors,
1055                    binary_dense_vectors,
1056                    schema,
1057                    trained,
1058                    w,
1059                )?;
1060            }
1061            if let Some(ref mut w) = sparse_writer {
1062                sparse::build_sparse_streaming(&mut sparse_vectors, schema, w)?;
1063            }
1064            if let Some(ref mut w) = fast_writer {
1065                build_fast_fields_streaming(&mut fast_fields, num_docs, w)?;
1066            }
1067        }
1068
1069        let term_dict_bytes = term_dict_writer.offset() as usize;
1070        let postings_bytes = postings_writer.offset() as usize;
1071        let store_bytes = store_writer.offset() as usize;
1072        let vectors_bytes = vectors_writer.as_ref().map_or(0, |w| w.offset() as usize);
1073        let sparse_bytes = sparse_writer.as_ref().map_or(0, |w| w.offset() as usize);
1074        let fast_bytes = fast_writer.as_ref().map_or(0, |w| w.offset() as usize);
1075
1076        term_dict_writer.finish()?;
1077        postings_writer.finish()?;
1078        store_writer.finish()?;
1079        if let Some(w) = vectors_writer {
1080            w.finish()?;
1081        }
1082        if let Some(w) = sparse_writer {
1083            w.finish()?;
1084        }
1085        if let Some(w) = fast_writer {
1086            w.finish()?;
1087        }
1088        drop(position_offsets);
1089        drop(sparse_vectors);
1090
1091        log::info!(
1092            "[segment_build] {} docs: term_dict={}, postings={}, store={}, vectors={}, sparse={}, fast={}",
1093            num_docs,
1094            super::format_bytes(term_dict_bytes),
1095            super::format_bytes(postings_bytes),
1096            super::format_bytes(store_bytes),
1097            super::format_bytes(vectors_bytes),
1098            super::format_bytes(sparse_bytes),
1099            super::format_bytes(fast_bytes),
1100        );
1101
1102        let meta = SegmentMeta {
1103            id: segment_id.0,
1104            num_docs: self.next_doc_id,
1105            field_stats: self.field_stats.clone(),
1106        };
1107
1108        dir.write(&files.meta, &meta.serialize()?).await?;
1109
1110        // Cleanup temp files
1111        #[cfg(feature = "native")]
1112        {
1113            let _ = std::fs::remove_file(&self.store_path);
1114        }
1115
1116        Ok(meta)
1117    }
1118}
1119
1120/// Serialize all fast-field columns to a `.fast` file.
1121fn build_fast_fields_streaming(
1122    fast_fields: &mut FxHashMap<u32, crate::structures::fast_field::FastFieldWriter>,
1123    num_docs: u32,
1124    writer: &mut dyn Write,
1125) -> Result<()> {
1126    use crate::structures::fast_field::{FastFieldTocEntry, write_fast_field_toc_and_footer};
1127
1128    if fast_fields.is_empty() {
1129        return Ok(());
1130    }
1131
1132    // Sort fields by id for deterministic output
1133    let mut field_ids: Vec<u32> = fast_fields.keys().copied().collect();
1134    field_ids.sort_unstable();
1135
1136    let mut toc_entries: Vec<FastFieldTocEntry> = Vec::with_capacity(field_ids.len());
1137    let mut current_offset = 0u64;
1138
1139    for &field_id in &field_ids {
1140        let ff = fast_fields.get_mut(&field_id).unwrap();
1141        ff.pad_to(num_docs);
1142
1143        let (mut toc, bytes_written) = ff.serialize(writer, current_offset)?;
1144        toc.field_id = field_id;
1145        current_offset += bytes_written;
1146        toc_entries.push(toc);
1147    }
1148
1149    // Write TOC + footer
1150    let toc_offset = current_offset;
1151    write_fast_field_toc_and_footer(writer, toc_offset, &toc_entries)?;
1152
1153    Ok(())
1154}
1155
1156#[cfg(feature = "native")]
1157impl Drop for SegmentBuilder {
1158    fn drop(&mut self) {
1159        // Cleanup temp files on drop
1160        let _ = std::fs::remove_file(&self.store_path);
1161    }
1162}