Skip to main content

hermes_core/segment/builder/
mod.rs

1//! Streaming segment builder with optimized memory usage
2//!
3//! Key optimizations:
4//! - **String interning**: Terms are interned using `lasso` to avoid repeated allocations
5//! - **hashbrown HashMap**: O(1) average insertion instead of BTreeMap's O(log n)
6//! - **Streaming document store**: Documents written to disk immediately
7//! - **Zero-copy store build**: Pre-serialized doc bytes passed directly to compressor
8//! - **Parallel posting serialization**: Rayon parallel sort + serialize
9//! - **Inline posting fast path**: Small terms skip PostingList/BlockPostingList entirely
10
11#[cfg_attr(not(feature = "native"), allow(dead_code))]
12pub(crate) mod bmp;
13mod config;
14mod dense;
15#[cfg(feature = "diagnostics")]
16mod diagnostics;
17#[cfg_attr(not(feature = "native"), allow(dead_code))]
18pub(crate) mod graph_bisection;
19mod postings;
20mod sparse;
21mod store;
22
23pub use config::{MemoryBreakdown, SegmentBuilderConfig, SegmentBuilderStats};
24
25#[cfg(feature = "native")]
26use std::fs::{File, OpenOptions};
27#[cfg(feature = "native")]
28use std::io::BufWriter;
29use std::io::Write;
30use std::mem::size_of;
31#[cfg(feature = "native")]
32use std::path::PathBuf;
33
34use hashbrown::HashMap;
35use rustc_hash::FxHashMap;
36
37// String interning: lasso on native (fast arena), HashMap on WASM (no C deps)
38#[cfg(feature = "native")]
39use lasso::{Rodeo, Spur};
40
41#[cfg(not(feature = "native"))]
42pub(crate) mod simple_interner {
43    use hashbrown::HashMap;
44
45    #[derive(Clone, Copy, PartialEq, Eq, Hash)]
46    pub struct Spur(u32);
47
48    /// Simple string interner for WASM (replaces lasso::Rodeo).
49    /// Stores each string once in a Vec; HashMap maps &str → index.
50    pub struct Rodeo {
51        /// Canonical storage — each string lives here exactly once.
52        strings: Vec<Box<str>>,
53        /// Maps borrowed string slices (pointing into `strings`) to their index.
54        /// Safety: entries are never removed and Box<str> has a stable address.
55        map: HashMap<&'static str, u32>,
56    }
57
58    impl Rodeo {
59        pub fn new() -> Self {
60            Self {
61                strings: Vec::new(),
62                map: HashMap::new(),
63            }
64        }
65
66        pub fn get(&self, key: &str) -> Option<Spur> {
67            self.map.get(key).map(|&id| Spur(id))
68        }
69
70        pub fn get_or_intern(&mut self, key: &str) -> Spur {
71            if let Some(&id) = self.map.get(key) {
72                return Spur(id);
73            }
74            let id = self.strings.len() as u32;
75            let boxed: Box<str> = key.into();
76            // Safety: the Box<str> is stored in self.strings (append-only Vec)
77            // and never moved or freed while the Rodeo is alive.
78            let static_ref: &'static str = unsafe { &*(boxed.as_ref() as *const str) };
79            self.strings.push(boxed);
80            self.map.insert(static_ref, id);
81            Spur(id)
82        }
83
84        pub fn resolve(&self, spur: &Spur) -> &str {
85            &self.strings[spur.0 as usize]
86        }
87
88        pub fn len(&self) -> usize {
89            self.strings.len()
90        }
91    }
92}
93
94#[cfg(not(feature = "native"))]
95use simple_interner::{Rodeo, Spur};
96
97use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
98use std::sync::Arc;
99
100use crate::directories::{Directory, DirectoryWriter};
101use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
102use crate::tokenizer::BoxedTokenizer;
103use crate::{DocId, Result};
104
105use dense::{BinaryDenseVectorBuilder, DenseVectorBuilder};
106use postings::{CompactPosting, PositionPostingListBuilder, PostingListBuilder, TermKey};
107use sparse::SparseVectorBuilder;
108
109/// Size of the document store buffer before writing to disk
110const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; // 16MB
111
112/// Memory overhead per new term in the inverted index:
113/// HashMap entry control byte + padding + TermKey + PostingListBuilder + Vec header
114const NEW_TERM_OVERHEAD: usize = size_of::<TermKey>() + size_of::<PostingListBuilder>() + 24;
115
116/// Memory overhead per newly interned string: Spur + arena pointers (2 × usize)
117const INTERN_OVERHEAD: usize = size_of::<Spur>() + 2 * size_of::<usize>();
118
119/// Memory overhead per new term in the position index
120const NEW_POS_TERM_OVERHEAD: usize =
121    size_of::<TermKey>() + size_of::<PositionPostingListBuilder>() + 24;
122
123/// Segment builder with optimized memory usage
124///
125/// Features:
126/// - Streams documents to disk immediately (no in-memory document storage)
127/// - Uses string interning for terms (reduced allocations)
128/// - Uses hashbrown HashMap (faster than BTreeMap)
129pub struct SegmentBuilder {
130    schema: Arc<Schema>,
131    config: SegmentBuilderConfig,
132    tokenizers: FxHashMap<Field, BoxedTokenizer>,
133
134    /// String interner for terms - O(1) lookup and deduplication
135    term_interner: Rodeo,
136
137    /// Inverted index: term key -> posting list
138    inverted_index: HashMap<TermKey, PostingListBuilder>,
139
140    /// Spill file for high-frequency posting lists (lazily created on first spill).
141    #[cfg(feature = "native")]
142    posting_spill_file: Option<BufWriter<File>>,
143    #[cfg(feature = "native")]
144    posting_spill_path: PathBuf,
145    /// Tracks spilled ranges per term key: (file_offset, posting_count).
146    #[cfg(feature = "native")]
147    posting_spill_index: HashMap<TermKey, Vec<(u64, u32)>>,
148    #[cfg(feature = "native")]
149    posting_spill_offset: u64,
150
151    /// Streaming document store writer (native: temp file on disk, WASM: in-memory buffer)
152    #[cfg(feature = "native")]
153    store_file: BufWriter<File>,
154    #[cfg(feature = "native")]
155    store_path: PathBuf,
156    #[cfg(not(feature = "native"))]
157    store_buffer: Vec<u8>,
158
159    /// Document count
160    next_doc_id: DocId,
161
162    /// Per-field statistics for BM25F
163    field_stats: FxHashMap<u32, FieldStats>,
164
165    /// Per-document field lengths stored compactly
166    /// Uses a flat Vec instead of Vec<HashMap> for better cache locality
167    /// Layout: [doc0_field0_len, doc0_field1_len, ..., doc1_field0_len, ...]
168    doc_field_lengths: Vec<u32>,
169    num_indexed_fields: usize,
170    field_to_slot: FxHashMap<u32, usize>,
171
172    /// Reusable buffer for per-document term frequency aggregation
173    /// Avoids allocating a new hashmap for each document
174    local_tf_buffer: FxHashMap<Spur, u32>,
175
176    /// Reusable buffer for per-document position tracking (when positions enabled)
177    /// Avoids allocating a new hashmap for each text field per document
178    local_positions: FxHashMap<Spur, Vec<u32>>,
179
180    /// Reusable buffer for tokenization to avoid per-token String allocations
181    token_buffer: String,
182
183    /// Reusable buffer for numeric field term encoding (avoids format!() alloc per call)
184    numeric_buffer: String,
185
186    /// Dense vector storage per field: field -> (doc_ids, vectors)
187    /// Vectors are stored as flat f32 arrays for efficient RaBitQ indexing
188    dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
189
190    /// Binary dense vector storage per field: field -> packed-bit vectors
191    binary_dense_vectors: FxHashMap<u32, BinaryDenseVectorBuilder>,
192
193    /// Sparse vector storage per field: field -> SparseVectorBuilder
194    /// Uses proper BlockSparsePostingList with configurable quantization
195    sparse_vectors: FxHashMap<u32, SparseVectorBuilder>,
196
197    /// Position index for fields with positions enabled
198    /// term key -> position posting list
199    position_index: HashMap<TermKey, PositionPostingListBuilder>,
200
201    /// Fields that have position tracking enabled, with their mode
202    position_enabled_fields: FxHashMap<u32, Option<crate::dsl::PositionMode>>,
203
204    /// Current element ordinal for multi-valued fields (reset per document)
205    current_element_ordinal: FxHashMap<u32, u32>,
206
207    /// Incrementally tracked memory estimate (avoids expensive stats() calls)
208    estimated_memory: usize,
209
210    /// Reusable buffer for document serialization (avoids per-document allocation)
211    doc_serialize_buffer: Vec<u8>,
212
213    /// Fast-field columnar writers per field_id (only for fields with fast=true)
214    fast_fields: FxHashMap<u32, crate::structures::fast_field::FastFieldWriter>,
215}
216
217impl SegmentBuilder {
218    /// Create a new segment builder
219    pub fn new(schema: Arc<Schema>, config: SegmentBuilderConfig) -> Result<Self> {
220        #[cfg(feature = "native")]
221        let (store_file, store_path, spill_path) = {
222            let segment_id = uuid::Uuid::new_v4();
223            let store_path = config
224                .temp_dir
225                .join(format!("hermes_store_{}.tmp", segment_id));
226            let store_file = BufWriter::with_capacity(
227                STORE_BUFFER_SIZE,
228                OpenOptions::new()
229                    .create(true)
230                    .write(true)
231                    .truncate(true)
232                    .open(&store_path)?,
233            );
234            let spill_path = config
235                .temp_dir
236                .join(format!("hermes_spill_{}.tmp", segment_id));
237            (store_file, store_path, spill_path)
238        };
239
240        // Count indexed fields, track positions, and auto-configure tokenizers
241        let registry = crate::tokenizer::TokenizerRegistry::new();
242        let mut num_indexed_fields = 0;
243        let mut field_to_slot = FxHashMap::default();
244        let mut position_enabled_fields = FxHashMap::default();
245        let mut tokenizers = FxHashMap::default();
246        for (field, entry) in schema.fields() {
247            if entry.indexed && matches!(entry.field_type, FieldType::Text) {
248                field_to_slot.insert(field.0, num_indexed_fields);
249                num_indexed_fields += 1;
250                if entry.positions.is_some() {
251                    position_enabled_fields.insert(field.0, entry.positions);
252                }
253                if let Some(ref tok_name) = entry.tokenizer
254                    && let Some(tokenizer) = registry.get(tok_name)
255                {
256                    tokenizers.insert(field, tokenizer);
257                }
258            }
259        }
260
261        // Initialize fast-field writers for fields with fast=true
262        use crate::structures::fast_field::{FastFieldColumnType, FastFieldWriter};
263        let mut fast_fields = FxHashMap::default();
264        for (field, entry) in schema.fields() {
265            if entry.fast {
266                let writer = if entry.multi {
267                    match entry.field_type {
268                        FieldType::U64 => {
269                            FastFieldWriter::new_numeric_multi(FastFieldColumnType::U64)
270                        }
271                        FieldType::I64 => {
272                            FastFieldWriter::new_numeric_multi(FastFieldColumnType::I64)
273                        }
274                        FieldType::F64 => {
275                            FastFieldWriter::new_numeric_multi(FastFieldColumnType::F64)
276                        }
277                        FieldType::Text => FastFieldWriter::new_text_multi(),
278                        _ => continue,
279                    }
280                } else {
281                    match entry.field_type {
282                        FieldType::U64 => FastFieldWriter::new_numeric(FastFieldColumnType::U64),
283                        FieldType::I64 => FastFieldWriter::new_numeric(FastFieldColumnType::I64),
284                        FieldType::F64 => FastFieldWriter::new_numeric(FastFieldColumnType::F64),
285                        FieldType::Text => FastFieldWriter::new_text(),
286                        _ => continue,
287                    }
288                };
289                fast_fields.insert(field.0, writer);
290            }
291        }
292
293        Ok(Self {
294            schema,
295            tokenizers,
296            term_interner: Rodeo::new(),
297            inverted_index: HashMap::with_capacity(config.posting_map_capacity),
298            #[cfg(feature = "native")]
299            posting_spill_file: None,
300            #[cfg(feature = "native")]
301            posting_spill_path: spill_path,
302            #[cfg(feature = "native")]
303            posting_spill_index: HashMap::new(),
304            #[cfg(feature = "native")]
305            posting_spill_offset: 0,
306            #[cfg(feature = "native")]
307            store_file,
308            #[cfg(feature = "native")]
309            store_path,
310            #[cfg(not(feature = "native"))]
311            store_buffer: Vec::with_capacity(STORE_BUFFER_SIZE),
312            next_doc_id: 0,
313            field_stats: FxHashMap::default(),
314            doc_field_lengths: Vec::new(),
315            num_indexed_fields,
316            field_to_slot,
317            local_tf_buffer: FxHashMap::default(),
318            local_positions: FxHashMap::default(),
319            token_buffer: String::with_capacity(64),
320            numeric_buffer: String::with_capacity(32),
321            config,
322            dense_vectors: FxHashMap::default(),
323            binary_dense_vectors: FxHashMap::default(),
324            sparse_vectors: FxHashMap::default(),
325            position_index: HashMap::new(),
326            position_enabled_fields,
327            current_element_ordinal: FxHashMap::default(),
328            estimated_memory: 0,
329            doc_serialize_buffer: Vec::with_capacity(256),
330            fast_fields,
331        })
332    }
333
334    pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
335        self.tokenizers.insert(field, tokenizer);
336    }
337
338    /// Get the current element ordinal for a field and increment it.
339    /// Used for multi-valued fields (text, dense_vector, sparse_vector).
340    fn next_element_ordinal(&mut self, field_id: u32) -> u32 {
341        let ordinal = *self.current_element_ordinal.get(&field_id).unwrap_or(&0);
342        *self.current_element_ordinal.entry(field_id).or_insert(0) += 1;
343        ordinal
344    }
345
346    pub fn num_docs(&self) -> u32 {
347        self.next_doc_id
348    }
349
350    /// Fast O(1) memory estimate - updated incrementally during indexing
351    #[inline]
352    pub fn estimated_memory_bytes(&self) -> usize {
353        self.estimated_memory
354    }
355
356    /// Count total unique sparse dimensions across all fields
357    pub fn sparse_dim_count(&self) -> usize {
358        self.sparse_vectors.values().map(|b| b.postings.len()).sum()
359    }
360
361    /// Get current statistics for debugging performance (expensive - iterates all data)
362    pub fn stats(&self) -> SegmentBuilderStats {
363        use std::mem::size_of;
364
365        let postings_in_memory: usize =
366            self.inverted_index.values().map(|p| p.postings.len()).sum();
367
368        // Size constants computed from actual types
369        let compact_posting_size = size_of::<CompactPosting>();
370        let vec_overhead = size_of::<Vec<u8>>(); // Vec header: ptr + len + cap = 24 bytes on 64-bit
371        let term_key_size = size_of::<TermKey>();
372        let posting_builder_size = size_of::<PostingListBuilder>();
373        let spur_size = size_of::<Spur>();
374        let sparse_entry_size = size_of::<(DocId, u16, f32)>();
375
376        // hashbrown HashMap entry overhead: key + value + 1 byte control + padding
377        // Measured: ~(key_size + value_size + 8) per entry on average
378        let hashmap_entry_base_overhead = 8usize;
379
380        // FxHashMap uses same layout as hashbrown
381        let fxhashmap_entry_overhead = hashmap_entry_base_overhead;
382
383        // Postings memory
384        let postings_bytes: usize = self
385            .inverted_index
386            .values()
387            .map(|p| p.postings.capacity() * compact_posting_size + vec_overhead)
388            .sum();
389
390        // Inverted index overhead
391        let index_overhead_bytes = self.inverted_index.len()
392            * (term_key_size + posting_builder_size + hashmap_entry_base_overhead);
393
394        // Term interner: Rodeo stores strings + metadata
395        // Rodeo internal: string bytes + Spur + arena overhead (~2 pointers per string)
396        let interner_arena_overhead = 2 * size_of::<usize>();
397        let avg_term_len = 8; // Estimated average term length
398        let interner_bytes =
399            self.term_interner.len() * (avg_term_len + spur_size + interner_arena_overhead);
400
401        // Doc field lengths
402        let field_lengths_bytes =
403            self.doc_field_lengths.capacity() * size_of::<u32>() + vec_overhead;
404
405        // Dense vectors
406        let mut dense_vectors_bytes: usize = 0;
407        let mut dense_vector_count: usize = 0;
408        let doc_id_ordinal_size = size_of::<(DocId, u16)>();
409        for b in self.dense_vectors.values() {
410            dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
411                + b.doc_ids.capacity() * doc_id_ordinal_size
412                + 2 * vec_overhead; // Two Vecs
413            dense_vector_count += b.doc_ids.len();
414        }
415        // Binary dense vectors
416        for b in self.binary_dense_vectors.values() {
417            dense_vectors_bytes += b.vectors.capacity()
418                + b.doc_ids.capacity() * doc_id_ordinal_size
419                + 2 * vec_overhead;
420            dense_vector_count += b.doc_ids.len();
421        }
422
423        // Local buffers
424        let local_tf_entry_size = spur_size + size_of::<u32>() + fxhashmap_entry_overhead;
425        let local_tf_buffer_bytes = self.local_tf_buffer.capacity() * local_tf_entry_size;
426
427        // Sparse vectors
428        let mut sparse_vectors_bytes: usize = 0;
429        for builder in self.sparse_vectors.values() {
430            for postings in builder.postings.values() {
431                sparse_vectors_bytes += postings.capacity() * sparse_entry_size + vec_overhead;
432            }
433            // Inner FxHashMap overhead: u32 key + Vec value ptr + overhead
434            let inner_entry_size = size_of::<u32>() + vec_overhead + fxhashmap_entry_overhead;
435            sparse_vectors_bytes += builder.postings.len() * inner_entry_size;
436        }
437        // Outer FxHashMap overhead
438        let outer_sparse_entry_size =
439            size_of::<u32>() + size_of::<SparseVectorBuilder>() + fxhashmap_entry_overhead;
440        sparse_vectors_bytes += self.sparse_vectors.len() * outer_sparse_entry_size;
441
442        // Position index
443        let mut position_index_bytes: usize = 0;
444        for pos_builder in self.position_index.values() {
445            for (_, positions) in &pos_builder.postings {
446                position_index_bytes += positions.capacity() * size_of::<u32>() + vec_overhead;
447            }
448            // Vec<(DocId, Vec<u32>)> entry size
449            let pos_entry_size = size_of::<DocId>() + vec_overhead;
450            position_index_bytes += pos_builder.postings.capacity() * pos_entry_size;
451        }
452        // HashMap overhead for position_index
453        let pos_index_entry_size =
454            term_key_size + size_of::<PositionPostingListBuilder>() + hashmap_entry_base_overhead;
455        position_index_bytes += self.position_index.len() * pos_index_entry_size;
456
457        let estimated_memory_bytes = postings_bytes
458            + index_overhead_bytes
459            + interner_bytes
460            + field_lengths_bytes
461            + dense_vectors_bytes
462            + local_tf_buffer_bytes
463            + sparse_vectors_bytes
464            + position_index_bytes;
465
466        let memory_breakdown = MemoryBreakdown {
467            postings_bytes,
468            index_overhead_bytes,
469            interner_bytes,
470            field_lengths_bytes,
471            dense_vectors_bytes,
472            dense_vector_count,
473            sparse_vectors_bytes,
474            position_index_bytes,
475        };
476
477        SegmentBuilderStats {
478            num_docs: self.next_doc_id,
479            unique_terms: self.inverted_index.len(),
480            postings_in_memory,
481            interned_strings: self.term_interner.len(),
482            doc_field_lengths_size: self.doc_field_lengths.len(),
483            estimated_memory_bytes,
484            memory_breakdown,
485        }
486    }
487
488    /// Add a document - streams to disk immediately
489    pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
490        let doc_id = self.next_doc_id;
491        self.next_doc_id += 1;
492
493        // Initialize field lengths for this document
494        let base_idx = self.doc_field_lengths.len();
495        self.doc_field_lengths
496            .resize(base_idx + self.num_indexed_fields, 0);
497        self.estimated_memory += self.num_indexed_fields * std::mem::size_of::<u32>();
498
499        // Reset element ordinals for this document (for multi-valued fields)
500        self.current_element_ordinal.clear();
501
502        for (field, value) in doc.field_values() {
503            let Some(entry) = self.schema.get_field_entry(*field) else {
504                continue;
505            };
506
507            // Dense/binary vectors are written to .vectors when indexed || stored
508            // Other field types require indexed or fast
509            if !matches!(
510                &entry.field_type,
511                FieldType::DenseVector | FieldType::BinaryDenseVector
512            ) && !entry.indexed
513                && !entry.fast
514            {
515                continue;
516            }
517
518            match (&entry.field_type, value) {
519                (FieldType::Text, FieldValue::Text(text)) => {
520                    if entry.indexed {
521                        let element_ordinal = self.next_element_ordinal(field.0);
522                        let token_count =
523                            self.index_text_field(*field, doc_id, text, element_ordinal)?;
524
525                        let stats = self.field_stats.entry(field.0).or_default();
526                        stats.total_tokens += token_count as u64;
527                        if element_ordinal == 0 {
528                            stats.doc_count += 1;
529                        }
530
531                        if let Some(&slot) = self.field_to_slot.get(&field.0) {
532                            self.doc_field_lengths[base_idx + slot] = token_count;
533                        }
534                    }
535
536                    // Fast-field: store raw text for text ordinal column
537                    if let Some(ff) = self.fast_fields.get_mut(&field.0) {
538                        ff.add_text(doc_id, text);
539                    }
540                }
541                (FieldType::U64, FieldValue::U64(v)) => {
542                    if entry.indexed {
543                        self.index_numeric_field(*field, doc_id, *v)?;
544                    }
545                    if let Some(ff) = self.fast_fields.get_mut(&field.0) {
546                        ff.add_u64(doc_id, *v);
547                    }
548                }
549                (FieldType::I64, FieldValue::I64(v)) => {
550                    if entry.indexed {
551                        self.index_numeric_field(*field, doc_id, *v as u64)?;
552                    }
553                    if let Some(ff) = self.fast_fields.get_mut(&field.0) {
554                        ff.add_i64(doc_id, *v);
555                    }
556                }
557                (FieldType::F64, FieldValue::F64(v)) => {
558                    if entry.indexed {
559                        self.index_numeric_field(*field, doc_id, v.to_bits())?;
560                    }
561                    if let Some(ff) = self.fast_fields.get_mut(&field.0) {
562                        ff.add_f64(doc_id, *v);
563                    }
564                }
565                (FieldType::DenseVector, FieldValue::DenseVector(vec))
566                    if entry.indexed || entry.stored =>
567                {
568                    let ordinal = self.next_element_ordinal(field.0);
569                    self.index_dense_vector_field(*field, doc_id, ordinal as u16, vec)?;
570                }
571                (FieldType::BinaryDenseVector, FieldValue::BinaryDenseVector(bytes))
572                    if entry.indexed || entry.stored =>
573                {
574                    let ordinal = self.next_element_ordinal(field.0);
575                    self.index_binary_dense_vector_field(*field, doc_id, ordinal as u16, bytes)?;
576                }
577                (FieldType::SparseVector, FieldValue::SparseVector(entries)) => {
578                    let ordinal = self.next_element_ordinal(field.0);
579                    self.index_sparse_vector_field(*field, doc_id, ordinal as u16, entries)?;
580                }
581                _ => {}
582            }
583        }
584
585        // Stream document to disk immediately
586        self.write_document_to_store(&doc)?;
587
588        Ok(doc_id)
589    }
590
591    /// Index a text field using interned terms
592    ///
593    /// Uses a custom tokenizer when set for the field (via `set_tokenizer`),
594    /// otherwise falls back to an inline zero-allocation path (split_whitespace
595    /// + lowercase + strip non-alphanumeric).
596    ///
597    /// If position recording is enabled for this field, also records token positions
598    /// encoded as (element_ordinal << 20) | token_position.
599    fn index_text_field(
600        &mut self,
601        field: Field,
602        doc_id: DocId,
603        text: &str,
604        element_ordinal: u32,
605    ) -> Result<u32> {
606        use crate::dsl::PositionMode;
607
608        let field_id = field.0;
609        let position_mode = self
610            .position_enabled_fields
611            .get(&field_id)
612            .copied()
613            .flatten();
614
615        // Phase 1: Aggregate term frequencies within this document
616        // Also collect positions if enabled
617        // Reuse buffers to avoid allocations
618        self.local_tf_buffer.clear();
619        // Clear position Vecs in-place (keeps allocated capacity for reuse)
620        for v in self.local_positions.values_mut() {
621            v.clear();
622        }
623
624        let mut token_position = 0u32;
625
626        // Tokenize: use custom tokenizer if set, else inline zero-alloc path.
627        // The owned Vec<Token> is computed first so the immutable borrow of
628        // self.tokenizers ends before we mutate other fields.
629        let custom_tokens = self.tokenizers.get(&field).map(|t| t.tokenize(text));
630
631        if let Some(tokens) = custom_tokens {
632            // Custom tokenizer path
633            for token in &tokens {
634                let term_spur = if let Some(spur) = self.term_interner.get(&token.text) {
635                    spur
636                } else {
637                    let spur = self.term_interner.get_or_intern(&token.text);
638                    self.estimated_memory += token.text.len() + INTERN_OVERHEAD;
639                    spur
640                };
641                *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
642
643                if let Some(mode) = position_mode {
644                    let encoded_pos = match mode {
645                        PositionMode::Ordinal => element_ordinal << 20,
646                        PositionMode::TokenPosition => token.position,
647                        PositionMode::Full => (element_ordinal << 20) | token.position,
648                    };
649                    self.local_positions
650                        .entry(term_spur)
651                        .or_default()
652                        .push(encoded_pos);
653                }
654            }
655            token_position = tokens.len() as u32;
656        } else {
657            // Inline zero-allocation path: split_whitespace + lowercase + strip non-alphanumeric
658            for word in text.split_whitespace() {
659                self.token_buffer.clear();
660                for c in word.chars() {
661                    if c.is_alphanumeric() {
662                        for lc in c.to_lowercase() {
663                            self.token_buffer.push(lc);
664                        }
665                    }
666                }
667
668                if self.token_buffer.is_empty() {
669                    continue;
670                }
671
672                let term_spur = if let Some(spur) = self.term_interner.get(&self.token_buffer) {
673                    spur
674                } else {
675                    let spur = self.term_interner.get_or_intern(&self.token_buffer);
676                    self.estimated_memory += self.token_buffer.len() + INTERN_OVERHEAD;
677                    spur
678                };
679                *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
680
681                if let Some(mode) = position_mode {
682                    let encoded_pos = match mode {
683                        PositionMode::Ordinal => element_ordinal << 20,
684                        PositionMode::TokenPosition => token_position,
685                        PositionMode::Full => (element_ordinal << 20) | token_position,
686                    };
687                    self.local_positions
688                        .entry(term_spur)
689                        .or_default()
690                        .push(encoded_pos);
691                }
692
693                token_position += 1;
694            }
695        }
696
697        // Phase 2: Insert aggregated terms into inverted index
698        // Now we only do one inverted_index lookup per unique term in doc
699        for (&term_spur, &tf) in &self.local_tf_buffer {
700            let term_key = TermKey {
701                field: field_id,
702                term: term_spur,
703            };
704
705            match self.inverted_index.entry(term_key) {
706                hashbrown::hash_map::Entry::Occupied(mut o) => {
707                    o.get_mut().add(doc_id, tf);
708                    self.estimated_memory += size_of::<CompactPosting>();
709                    // Spill large posting lists to disk to reduce peak memory
710                    #[cfg(feature = "native")]
711                    if o.get().should_spill() {
712                        use byteorder::{LittleEndian, WriteBytesExt};
713
714                        let builder = o.get_mut();
715                        let count = builder.postings.len() as u32;
716                        let offset = self.posting_spill_offset;
717
718                        // Lazily create the spill file on first spill
719                        let spill_file = if let Some(ref mut f) = self.posting_spill_file {
720                            f
721                        } else {
722                            self.posting_spill_file = Some(BufWriter::with_capacity(
723                                256 * 1024,
724                                OpenOptions::new()
725                                    .create(true)
726                                    .write(true)
727                                    .truncate(true)
728                                    .open(&self.posting_spill_path)?,
729                            ));
730                            self.posting_spill_file.as_mut().unwrap()
731                        };
732                        for p in &builder.postings {
733                            spill_file.write_u32::<LittleEndian>(p.doc_id)?;
734                            spill_file.write_u16::<LittleEndian>(p.term_freq)?;
735                        }
736                        self.posting_spill_offset += count as u64 * 6;
737                        self.posting_spill_index
738                            .entry(term_key)
739                            .or_default()
740                            .push((offset, count));
741
742                        let freed = builder.postings.len() * size_of::<CompactPosting>();
743                        builder.spilled_count += count;
744                        builder.postings.clear();
745                        self.estimated_memory -= freed;
746                    }
747                }
748                hashbrown::hash_map::Entry::Vacant(v) => {
749                    let mut posting = PostingListBuilder::new();
750                    posting.add(doc_id, tf);
751                    v.insert(posting);
752                    self.estimated_memory += size_of::<CompactPosting>() + NEW_TERM_OVERHEAD;
753                }
754            }
755
756            if position_mode.is_some()
757                && let Some(positions) = self.local_positions.get(&term_spur)
758            {
759                match self.position_index.entry(term_key) {
760                    hashbrown::hash_map::Entry::Occupied(mut o) => {
761                        for &pos in positions {
762                            o.get_mut().add_position(doc_id, pos);
763                        }
764                        self.estimated_memory += positions.len() * size_of::<u32>();
765                    }
766                    hashbrown::hash_map::Entry::Vacant(v) => {
767                        let mut pos_posting = PositionPostingListBuilder::new();
768                        for &pos in positions {
769                            pos_posting.add_position(doc_id, pos);
770                        }
771                        self.estimated_memory +=
772                            positions.len() * size_of::<u32>() + NEW_POS_TERM_OVERHEAD;
773                        v.insert(pos_posting);
774                    }
775                }
776            }
777        }
778
779        Ok(token_position)
780    }
781
782    fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
783        use std::fmt::Write;
784
785        self.numeric_buffer.clear();
786        write!(self.numeric_buffer, "__num_{}", value).unwrap();
787        let term_spur = if let Some(spur) = self.term_interner.get(&self.numeric_buffer) {
788            spur
789        } else {
790            let spur = self.term_interner.get_or_intern(&self.numeric_buffer);
791            self.estimated_memory += self.numeric_buffer.len() + INTERN_OVERHEAD;
792            spur
793        };
794
795        let term_key = TermKey {
796            field: field.0,
797            term: term_spur,
798        };
799
800        match self.inverted_index.entry(term_key) {
801            hashbrown::hash_map::Entry::Occupied(mut o) => {
802                o.get_mut().add(doc_id, 1);
803                self.estimated_memory += size_of::<CompactPosting>();
804            }
805            hashbrown::hash_map::Entry::Vacant(v) => {
806                let mut posting = PostingListBuilder::new();
807                posting.add(doc_id, 1);
808                v.insert(posting);
809                self.estimated_memory += size_of::<CompactPosting>() + NEW_TERM_OVERHEAD;
810            }
811        }
812
813        Ok(())
814    }
815
816    /// Index a dense vector field with ordinal tracking
817    fn index_dense_vector_field(
818        &mut self,
819        field: Field,
820        doc_id: DocId,
821        ordinal: u16,
822        vector: &[f32],
823    ) -> Result<()> {
824        let dim = vector.len();
825
826        let builder = self
827            .dense_vectors
828            .entry(field.0)
829            .or_insert_with(|| DenseVectorBuilder::new(dim));
830
831        // Verify dimension consistency
832        if builder.dim != dim && builder.len() > 0 {
833            return Err(crate::Error::Schema(format!(
834                "Dense vector dimension mismatch: expected {}, got {}",
835                builder.dim, dim
836            )));
837        }
838
839        builder.add(doc_id, ordinal, vector);
840
841        self.estimated_memory += std::mem::size_of_val(vector) + size_of::<(DocId, u16)>();
842
843        Ok(())
844    }
845
846    /// Index a binary dense vector field with ordinal tracking
847    fn index_binary_dense_vector_field(
848        &mut self,
849        field: Field,
850        doc_id: DocId,
851        ordinal: u16,
852        bytes: &[u8],
853    ) -> Result<()> {
854        let dim_bits = self
855            .schema
856            .get_field_entry(field)
857            .and_then(|e| e.binary_dense_vector_config.as_ref())
858            .map(|c| c.dim)
859            .ok_or_else(|| {
860                crate::Error::Schema("BinaryDenseVector field missing config".to_string())
861            })?;
862
863        let expected_byte_len = dim_bits.div_ceil(8);
864        if bytes.len() != expected_byte_len {
865            return Err(crate::Error::Schema(format!(
866                "Binary vector byte length mismatch: expected {} (dim={}), got {}",
867                expected_byte_len,
868                dim_bits,
869                bytes.len()
870            )));
871        }
872
873        let builder = self
874            .binary_dense_vectors
875            .entry(field.0)
876            .or_insert_with(|| BinaryDenseVectorBuilder::new(dim_bits));
877
878        builder.add(doc_id, ordinal, bytes);
879        self.estimated_memory += bytes.len() + size_of::<(DocId, u16)>();
880
881        Ok(())
882    }
883
884    /// Index a sparse vector field using dedicated sparse posting lists
885    ///
886    /// Collects (doc_id, ordinal, weight) postings per dimension. During commit, these are
887    /// converted to BlockSparsePostingList with proper quantization from SparseVectorConfig.
888    ///
889    /// Weights below the configured `weight_threshold` are not indexed.
890    fn index_sparse_vector_field(
891        &mut self,
892        field: Field,
893        doc_id: DocId,
894        ordinal: u16,
895        entries: &[(u32, f32)],
896    ) -> Result<()> {
897        // Get weight threshold from field config (default 0.0 = no filtering)
898        let weight_threshold = self
899            .schema
900            .get_field_entry(field)
901            .and_then(|entry| entry.sparse_vector_config.as_ref())
902            .map(|config| config.weight_threshold)
903            .unwrap_or(0.0);
904
905        let builder = self
906            .sparse_vectors
907            .entry(field.0)
908            .or_insert_with(SparseVectorBuilder::new);
909
910        builder.inc_vector_count();
911
912        for &(dim_id, weight) in entries {
913            // Skip weights below threshold
914            if weight.abs() < weight_threshold {
915                continue;
916            }
917
918            let is_new_dim = !builder.postings.contains_key(&dim_id);
919            builder.add(dim_id, doc_id, ordinal, weight);
920            self.estimated_memory += size_of::<(DocId, u16, f32)>();
921            if is_new_dim {
922                // HashMap entry overhead + Vec header
923                self.estimated_memory += size_of::<u32>() + size_of::<Vec<(DocId, u16, f32)>>() + 8; // 8 = hashmap control byte + padding
924            }
925        }
926
927        Ok(())
928    }
929
930    /// Write document to streaming store (reuses internal buffer to avoid per-doc allocation)
931    fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
932        use byteorder::{LittleEndian, WriteBytesExt};
933
934        super::store::serialize_document_into(doc, &self.schema, &mut self.doc_serialize_buffer)?;
935
936        #[cfg(feature = "native")]
937        {
938            self.store_file
939                .write_u32::<LittleEndian>(self.doc_serialize_buffer.len() as u32)?;
940            self.store_file.write_all(&self.doc_serialize_buffer)?;
941        }
942        #[cfg(not(feature = "native"))]
943        {
944            self.store_buffer
945                .write_u32::<LittleEndian>(self.doc_serialize_buffer.len() as u32)?;
946            self.store_buffer.write_all(&self.doc_serialize_buffer)?;
947        }
948
949        Ok(())
950    }
951
952    /// Build the final segment
953    ///
954    /// Streams all data directly to disk via StreamingWriter to avoid buffering
955    /// entire serialized outputs in memory. Each phase consumes and drops its
956    /// source data before the next phase begins.
957    pub async fn build<D: Directory + DirectoryWriter>(
958        mut self,
959        dir: &D,
960        segment_id: SegmentId,
961        trained: Option<&super::TrainedVectorStructures>,
962    ) -> Result<SegmentMeta> {
963        // Flush any buffered data
964        #[cfg(feature = "native")]
965        self.store_file.flush()?;
966
967        let files = SegmentFiles::new(segment_id.0);
968
969        // Phase 1: Stream positions directly to disk (consumes position_index)
970        let position_index = std::mem::take(&mut self.position_index);
971        let position_offsets = if !position_index.is_empty() {
972            let mut pos_writer = dir.streaming_writer(&files.positions).await?;
973            let offsets = postings::build_positions_streaming(
974                position_index,
975                &self.term_interner,
976                &mut *pos_writer,
977            )?;
978            pos_writer.finish()?;
979            offsets
980        } else {
981            FxHashMap::default()
982        };
983
984        // Phase 2: 4-way parallel build — postings, store, dense vectors, sparse vectors
985        // These are fully independent: different source data, different output files.
986        let inverted_index = std::mem::take(&mut self.inverted_index);
987        let term_interner = std::mem::replace(&mut self.term_interner, Rodeo::new());
988        #[cfg(feature = "native")]
989        let store_path = self.store_path.clone();
990        #[cfg(feature = "native")]
991        let num_compression_threads = self.config.num_compression_threads;
992        let compression_level = self.config.compression_level;
993        let dense_vectors = std::mem::take(&mut self.dense_vectors);
994        let binary_dense_vectors = std::mem::take(&mut self.binary_dense_vectors);
995        let mut sparse_vectors = std::mem::take(&mut self.sparse_vectors);
996        let schema = &self.schema;
997
998        // Pre-create all streaming writers (async) before entering sync rayon scope
999        // Wrapped in OffsetWriter to track bytes written per phase.
1000        let mut term_dict_writer =
1001            super::OffsetWriter::new(dir.streaming_writer(&files.term_dict).await?);
1002        let mut postings_writer =
1003            super::OffsetWriter::new(dir.streaming_writer(&files.postings).await?);
1004        let mut store_writer = super::OffsetWriter::new(dir.streaming_writer(&files.store).await?);
1005        let mut vectors_writer = if !dense_vectors.is_empty() || !binary_dense_vectors.is_empty() {
1006            Some(super::OffsetWriter::new(
1007                dir.streaming_writer(&files.vectors).await?,
1008            ))
1009        } else {
1010            None
1011        };
1012        let mut sparse_writer = if !sparse_vectors.is_empty() {
1013            Some(super::OffsetWriter::new(
1014                dir.streaming_writer(&files.sparse).await?,
1015            ))
1016        } else {
1017            None
1018        };
1019        let mut fast_fields = std::mem::take(&mut self.fast_fields);
1020        let num_docs = self.next_doc_id;
1021        let mut fast_writer = if !fast_fields.is_empty() {
1022            Some(super::OffsetWriter::new(
1023                dir.streaming_writer(&files.fast).await?,
1024            ))
1025        } else {
1026            None
1027        };
1028
1029        #[cfg(feature = "native")]
1030        {
1031            if let Some(ref mut f) = self.posting_spill_file {
1032                f.flush()?;
1033            }
1034            let posting_spill_index = std::mem::take(&mut self.posting_spill_index);
1035            let mut spill_reader_opt = if !posting_spill_index.is_empty() {
1036                let spill_file = std::fs::File::open(&self.posting_spill_path)?;
1037                Some((std::io::BufReader::new(spill_file), posting_spill_index))
1038            } else {
1039                None
1040            };
1041
1042            let ((postings_result, store_result), ((vectors_result, sparse_result), fast_result)) =
1043                rayon::join(
1044                    || {
1045                        rayon::join(
1046                            || {
1047                                let spill_arg = spill_reader_opt.as_mut().map(|(r, idx)| {
1048                                    (
1049                                        r as &mut std::io::BufReader<std::fs::File>,
1050                                        idx as &postings::SpillIndex,
1051                                    )
1052                                });
1053                                postings::build_postings_streaming(
1054                                    inverted_index,
1055                                    term_interner,
1056                                    &position_offsets,
1057                                    &mut term_dict_writer,
1058                                    &mut postings_writer,
1059                                    spill_arg,
1060                                )
1061                            },
1062                            || {
1063                                store::build_store_streaming(
1064                                    &store_path,
1065                                    num_compression_threads,
1066                                    compression_level,
1067                                    &mut store_writer,
1068                                    num_docs,
1069                                )
1070                            },
1071                        )
1072                    },
1073                    || {
1074                        rayon::join(
1075                            || {
1076                                rayon::join(
1077                                    || -> Result<()> {
1078                                        if let Some(ref mut w) = vectors_writer {
1079                                            dense::build_vectors_streaming(
1080                                                dense_vectors,
1081                                                binary_dense_vectors,
1082                                                schema,
1083                                                trained,
1084                                                w,
1085                                            )?;
1086                                        }
1087                                        Ok(())
1088                                    },
1089                                    || -> Result<()> {
1090                                        if let Some(ref mut w) = sparse_writer {
1091                                            sparse::build_sparse_streaming(
1092                                                &mut sparse_vectors,
1093                                                schema,
1094                                                w,
1095                                            )?;
1096                                        }
1097                                        Ok(())
1098                                    },
1099                                )
1100                            },
1101                            || -> Result<()> {
1102                                if let Some(ref mut w) = fast_writer {
1103                                    build_fast_fields_streaming(&mut fast_fields, num_docs, w)?;
1104                                }
1105                                Ok(())
1106                            },
1107                        )
1108                    },
1109                );
1110            postings_result?;
1111            store_result?;
1112            vectors_result?;
1113            sparse_result?;
1114            fast_result?;
1115        }
1116
1117        #[cfg(not(feature = "native"))]
1118        {
1119            postings::build_postings_streaming(
1120                inverted_index,
1121                term_interner,
1122                &position_offsets,
1123                &mut term_dict_writer,
1124                &mut postings_writer,
1125            )?;
1126            store::build_store_streaming_from_buffer(
1127                &self.store_buffer,
1128                compression_level,
1129                &mut store_writer,
1130                num_docs,
1131            )?;
1132            if let Some(ref mut w) = vectors_writer {
1133                dense::build_vectors_streaming(
1134                    dense_vectors,
1135                    binary_dense_vectors,
1136                    schema,
1137                    trained,
1138                    w,
1139                )?;
1140            }
1141            if let Some(ref mut w) = sparse_writer {
1142                sparse::build_sparse_streaming(&mut sparse_vectors, schema, w)?;
1143            }
1144            if let Some(ref mut w) = fast_writer {
1145                build_fast_fields_streaming(&mut fast_fields, num_docs, w)?;
1146            }
1147        }
1148
1149        let term_dict_bytes = term_dict_writer.offset() as usize;
1150        let postings_bytes = postings_writer.offset() as usize;
1151        let store_bytes = store_writer.offset() as usize;
1152        let vectors_bytes = vectors_writer.as_ref().map_or(0, |w| w.offset() as usize);
1153        let sparse_bytes = sparse_writer.as_ref().map_or(0, |w| w.offset() as usize);
1154        let fast_bytes = fast_writer.as_ref().map_or(0, |w| w.offset() as usize);
1155
1156        term_dict_writer.finish()?;
1157        postings_writer.finish()?;
1158        store_writer.finish()?;
1159        if let Some(w) = vectors_writer {
1160            w.finish()?;
1161        }
1162        if let Some(w) = sparse_writer {
1163            w.finish()?;
1164        }
1165        if let Some(w) = fast_writer {
1166            w.finish()?;
1167        }
1168        drop(position_offsets);
1169        drop(sparse_vectors);
1170
1171        log::info!(
1172            "[segment_build] {} docs: term_dict={}, postings={}, store={}, vectors={}, sparse={}, fast={}",
1173            num_docs,
1174            super::format_bytes(term_dict_bytes),
1175            super::format_bytes(postings_bytes),
1176            super::format_bytes(store_bytes),
1177            super::format_bytes(vectors_bytes),
1178            super::format_bytes(sparse_bytes),
1179            super::format_bytes(fast_bytes),
1180        );
1181
1182        let meta = SegmentMeta {
1183            id: segment_id.0,
1184            num_docs: self.next_doc_id,
1185            field_stats: self.field_stats.clone(),
1186        };
1187
1188        dir.write(&files.meta, &meta.serialize()?).await?;
1189
1190        // Cleanup temp files
1191        #[cfg(feature = "native")]
1192        {
1193            let _ = std::fs::remove_file(&self.store_path);
1194        }
1195
1196        Ok(meta)
1197    }
1198}
1199
1200/// Serialize all fast-field columns to a `.fast` file.
1201fn build_fast_fields_streaming(
1202    fast_fields: &mut FxHashMap<u32, crate::structures::fast_field::FastFieldWriter>,
1203    num_docs: u32,
1204    writer: &mut dyn Write,
1205) -> Result<()> {
1206    use crate::structures::fast_field::{FastFieldTocEntry, write_fast_field_toc_and_footer};
1207
1208    if fast_fields.is_empty() {
1209        return Ok(());
1210    }
1211
1212    // Sort fields by id for deterministic output
1213    let mut field_ids: Vec<u32> = fast_fields.keys().copied().collect();
1214    field_ids.sort_unstable();
1215
1216    let mut toc_entries: Vec<FastFieldTocEntry> = Vec::with_capacity(field_ids.len());
1217    let mut current_offset = 0u64;
1218
1219    for &field_id in &field_ids {
1220        let ff = fast_fields.get_mut(&field_id).unwrap();
1221        ff.pad_to(num_docs);
1222
1223        let (mut toc, bytes_written) = ff.serialize(writer, current_offset)?;
1224        toc.field_id = field_id;
1225        current_offset += bytes_written;
1226        toc_entries.push(toc);
1227    }
1228
1229    // Write TOC + footer
1230    let toc_offset = current_offset;
1231    write_fast_field_toc_and_footer(writer, toc_offset, &toc_entries)?;
1232
1233    Ok(())
1234}
1235
1236#[cfg(feature = "native")]
1237impl Drop for SegmentBuilder {
1238    fn drop(&mut self) {
1239        let _ = std::fs::remove_file(&self.store_path);
1240        if self.posting_spill_file.is_some() {
1241            let _ = std::fs::remove_file(&self.posting_spill_path);
1242        }
1243    }
1244}