Skip to main content

hermes_core/segment/
builder.rs

1//! Streaming segment builder with optimized memory usage
2//!
3//! Key optimizations:
4//! - **String interning**: Terms are interned using `lasso` to avoid repeated allocations
5//! - **hashbrown HashMap**: O(1) average insertion instead of BTreeMap's O(log n)
6//! - **Streaming document store**: Documents written to disk immediately
7//! - **Incremental posting flush**: Large posting lists flushed to temp file
8//! - **Memory-mapped intermediate files**: Reduces memory pressure
9//! - **Arena allocation**: Batch allocations for reduced fragmentation
10
11use std::fs::{File, OpenOptions};
12use std::io::{BufWriter, Write};
13use std::path::PathBuf;
14
15use hashbrown::HashMap;
16use lasso::{Rodeo, Spur};
17use rayon::prelude::*;
18use rustc_hash::FxHashMap;
19
20use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
21use crate::compression::CompressionLevel;
22use crate::directories::{Directory, DirectoryWriter};
23use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
24use crate::structures::{PostingList, SSTableWriter, TermInfo};
25use crate::tokenizer::BoxedTokenizer;
26use crate::{DocId, Result};
27
28// Re-export from vector_data for backwards compatibility
29pub use super::vector_data::{FlatVectorData, IVFRaBitQIndexData, ScaNNIndexData};
30
31/// Size of the document store buffer before writing to disk
32const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; // 16MB
33
34/// Interned term key combining field and term
35#[derive(Clone, Copy, PartialEq, Eq, Hash)]
36struct TermKey {
37    field: u32,
38    term: Spur,
39}
40
41/// Compact posting entry for in-memory storage
42#[derive(Clone, Copy)]
43struct CompactPosting {
44    doc_id: DocId,
45    term_freq: u16,
46}
47
48/// In-memory posting list for a term
49struct PostingListBuilder {
50    /// In-memory postings
51    postings: Vec<CompactPosting>,
52}
53
54impl PostingListBuilder {
55    fn new() -> Self {
56        Self {
57            postings: Vec::new(),
58        }
59    }
60
61    /// Add a posting, merging if same doc_id as last
62    #[inline]
63    fn add(&mut self, doc_id: DocId, term_freq: u32) {
64        // Check if we can merge with the last posting
65        if let Some(last) = self.postings.last_mut()
66            && last.doc_id == doc_id
67        {
68            last.term_freq = last.term_freq.saturating_add(term_freq as u16);
69            return;
70        }
71        self.postings.push(CompactPosting {
72            doc_id,
73            term_freq: term_freq.min(u16::MAX as u32) as u16,
74        });
75    }
76
77    fn len(&self) -> usize {
78        self.postings.len()
79    }
80}
81
82/// In-memory position posting list for a term (for fields with record_positions=true)
83struct PositionPostingListBuilder {
84    /// Doc ID -> list of positions (encoded as element_ordinal << 20 | token_position)
85    postings: Vec<(DocId, Vec<u32>)>,
86}
87
88impl PositionPostingListBuilder {
89    fn new() -> Self {
90        Self {
91            postings: Vec::new(),
92        }
93    }
94
95    /// Add a position for a document
96    #[inline]
97    fn add_position(&mut self, doc_id: DocId, position: u32) {
98        if let Some((last_doc, positions)) = self.postings.last_mut()
99            && *last_doc == doc_id
100        {
101            positions.push(position);
102            return;
103        }
104        self.postings.push((doc_id, vec![position]));
105    }
106}
107
108/// Intermediate result for parallel posting serialization
109enum SerializedPosting {
110    /// Inline posting (small enough to fit in TermInfo)
111    Inline(TermInfo),
112    /// External posting with serialized bytes
113    External { bytes: Vec<u8>, doc_count: u32 },
114}
115
116/// Statistics for debugging segment builder performance
117#[derive(Debug, Clone)]
118pub struct SegmentBuilderStats {
119    /// Number of documents indexed
120    pub num_docs: u32,
121    /// Number of unique terms in the inverted index
122    pub unique_terms: usize,
123    /// Total postings in memory (across all terms)
124    pub postings_in_memory: usize,
125    /// Number of interned strings
126    pub interned_strings: usize,
127    /// Size of doc_field_lengths vector
128    pub doc_field_lengths_size: usize,
129    /// Estimated total memory usage in bytes
130    pub estimated_memory_bytes: usize,
131    /// Memory breakdown by component
132    pub memory_breakdown: MemoryBreakdown,
133}
134
135/// Detailed memory breakdown by component
136#[derive(Debug, Clone, Default)]
137pub struct MemoryBreakdown {
138    /// Postings memory (CompactPosting structs)
139    pub postings_bytes: usize,
140    /// Inverted index HashMap overhead
141    pub index_overhead_bytes: usize,
142    /// Term interner memory
143    pub interner_bytes: usize,
144    /// Document field lengths
145    pub field_lengths_bytes: usize,
146    /// Dense vector storage
147    pub dense_vectors_bytes: usize,
148    /// Number of dense vectors
149    pub dense_vector_count: usize,
150    /// Sparse vector storage
151    pub sparse_vectors_bytes: usize,
152    /// Position index storage
153    pub position_index_bytes: usize,
154}
155
156/// Configuration for segment builder
157#[derive(Clone)]
158pub struct SegmentBuilderConfig {
159    /// Directory for temporary spill files
160    pub temp_dir: PathBuf,
161    /// Compression level for document store
162    pub compression_level: CompressionLevel,
163    /// Number of threads for parallel compression
164    pub num_compression_threads: usize,
165    /// Initial capacity for term interner
166    pub interner_capacity: usize,
167    /// Initial capacity for posting lists hashmap
168    pub posting_map_capacity: usize,
169}
170
171impl Default for SegmentBuilderConfig {
172    fn default() -> Self {
173        Self {
174            temp_dir: std::env::temp_dir(),
175            compression_level: CompressionLevel(7),
176            num_compression_threads: num_cpus::get(),
177            interner_capacity: 1_000_000,
178            posting_map_capacity: 500_000,
179        }
180    }
181}
182
183/// Segment builder with optimized memory usage
184///
185/// Features:
186/// - Streams documents to disk immediately (no in-memory document storage)
187/// - Uses string interning for terms (reduced allocations)
188/// - Uses hashbrown HashMap (faster than BTreeMap)
189pub struct SegmentBuilder {
190    schema: Schema,
191    config: SegmentBuilderConfig,
192    tokenizers: FxHashMap<Field, BoxedTokenizer>,
193
194    /// String interner for terms - O(1) lookup and deduplication
195    term_interner: Rodeo,
196
197    /// Inverted index: term key -> posting list
198    inverted_index: HashMap<TermKey, PostingListBuilder>,
199
200    /// Streaming document store writer
201    store_file: BufWriter<File>,
202    store_path: PathBuf,
203
204    /// Document count
205    next_doc_id: DocId,
206
207    /// Per-field statistics for BM25F
208    field_stats: FxHashMap<u32, FieldStats>,
209
210    /// Per-document field lengths stored compactly
211    /// Uses a flat Vec instead of Vec<HashMap> for better cache locality
212    /// Layout: [doc0_field0_len, doc0_field1_len, ..., doc1_field0_len, ...]
213    doc_field_lengths: Vec<u32>,
214    num_indexed_fields: usize,
215    field_to_slot: FxHashMap<u32, usize>,
216
217    /// Reusable buffer for per-document term frequency aggregation
218    /// Avoids allocating a new hashmap for each document
219    local_tf_buffer: FxHashMap<Spur, u32>,
220
221    /// Reusable buffer for tokenization to avoid per-token String allocations
222    token_buffer: String,
223
224    /// Dense vector storage per field: field -> (doc_ids, vectors)
225    /// Vectors are stored as flat f32 arrays for efficient RaBitQ indexing
226    dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
227
228    /// Sparse vector storage per field: field -> SparseVectorBuilder
229    /// Uses proper BlockSparsePostingList with configurable quantization
230    sparse_vectors: FxHashMap<u32, SparseVectorBuilder>,
231
232    /// Position index for fields with positions enabled
233    /// term key -> position posting list
234    position_index: HashMap<TermKey, PositionPostingListBuilder>,
235
236    /// Fields that have position tracking enabled, with their mode
237    position_enabled_fields: FxHashMap<u32, Option<crate::dsl::PositionMode>>,
238
239    /// Current element ordinal for multi-valued fields (reset per document)
240    current_element_ordinal: FxHashMap<u32, u32>,
241
242    /// Incrementally tracked memory estimate (avoids expensive stats() calls)
243    estimated_memory: usize,
244}
245
246/// Builder for dense vector index using RaBitQ
247struct DenseVectorBuilder {
248    /// Dimension of vectors
249    dim: usize,
250    /// Document IDs with vectors
251    doc_ids: Vec<DocId>,
252    /// Flat vector storage (doc_ids.len() * dim floats)
253    vectors: Vec<f32>,
254}
255
256impl DenseVectorBuilder {
257    fn new(dim: usize) -> Self {
258        Self {
259            dim,
260            doc_ids: Vec::new(),
261            vectors: Vec::new(),
262        }
263    }
264
265    fn add(&mut self, doc_id: DocId, vector: &[f32]) {
266        debug_assert_eq!(vector.len(), self.dim, "Vector dimension mismatch");
267        self.doc_ids.push(doc_id);
268        self.vectors.extend_from_slice(vector);
269    }
270
271    fn len(&self) -> usize {
272        self.doc_ids.len()
273    }
274
275    /// Get all vectors as Vec<Vec<f32>> for RaBitQ indexing
276    fn get_vectors(&self) -> Vec<Vec<f32>> {
277        self.doc_ids
278            .iter()
279            .enumerate()
280            .map(|(i, _)| {
281                let start = i * self.dim;
282                self.vectors[start..start + self.dim].to_vec()
283            })
284            .collect()
285    }
286
287    /// Get vectors trimmed to specified dimension for matryoshka/MRL indexing
288    fn get_vectors_trimmed(&self, trim_dim: usize) -> Vec<Vec<f32>> {
289        debug_assert!(trim_dim <= self.dim, "trim_dim must be <= dim");
290        self.doc_ids
291            .iter()
292            .enumerate()
293            .map(|(i, _)| {
294                let start = i * self.dim;
295                self.vectors[start..start + trim_dim].to_vec()
296            })
297            .collect()
298    }
299}
300
301/// Builder for sparse vector index using BlockSparsePostingList
302///
303/// Collects (doc_id, weight) postings per dimension, then builds
304/// BlockSparsePostingList with proper quantization during commit.
305struct SparseVectorBuilder {
306    /// Postings per dimension: dim_id -> Vec<(doc_id, weight)>
307    postings: FxHashMap<u32, Vec<(DocId, f32)>>,
308}
309
310impl SparseVectorBuilder {
311    fn new() -> Self {
312        Self {
313            postings: FxHashMap::default(),
314        }
315    }
316
317    /// Add a sparse vector entry
318    #[inline]
319    fn add(&mut self, dim_id: u32, doc_id: DocId, weight: f32) {
320        self.postings
321            .entry(dim_id)
322            .or_default()
323            .push((doc_id, weight));
324    }
325
326    fn is_empty(&self) -> bool {
327        self.postings.is_empty()
328    }
329}
330
331impl SegmentBuilder {
332    /// Create a new segment builder
333    pub fn new(schema: Schema, config: SegmentBuilderConfig) -> Result<Self> {
334        let segment_id = uuid::Uuid::new_v4();
335        let store_path = config
336            .temp_dir
337            .join(format!("hermes_store_{}.tmp", segment_id));
338
339        let store_file = BufWriter::with_capacity(
340            STORE_BUFFER_SIZE,
341            OpenOptions::new()
342                .create(true)
343                .write(true)
344                .truncate(true)
345                .open(&store_path)?,
346        );
347
348        // Count indexed fields for compact field length storage
349        // Also track which fields have position recording enabled
350        let mut num_indexed_fields = 0;
351        let mut field_to_slot = FxHashMap::default();
352        let mut position_enabled_fields = FxHashMap::default();
353        for (field, entry) in schema.fields() {
354            if entry.indexed && matches!(entry.field_type, FieldType::Text) {
355                field_to_slot.insert(field.0, num_indexed_fields);
356                num_indexed_fields += 1;
357                if entry.positions.is_some() {
358                    position_enabled_fields.insert(field.0, entry.positions);
359                }
360            }
361        }
362
363        Ok(Self {
364            schema,
365            tokenizers: FxHashMap::default(),
366            term_interner: Rodeo::new(),
367            inverted_index: HashMap::with_capacity(config.posting_map_capacity),
368            store_file,
369            store_path,
370            next_doc_id: 0,
371            field_stats: FxHashMap::default(),
372            doc_field_lengths: Vec::new(),
373            num_indexed_fields,
374            field_to_slot,
375            local_tf_buffer: FxHashMap::default(),
376            token_buffer: String::with_capacity(64),
377            config,
378            dense_vectors: FxHashMap::default(),
379            sparse_vectors: FxHashMap::default(),
380            position_index: HashMap::new(),
381            position_enabled_fields,
382            current_element_ordinal: FxHashMap::default(),
383            estimated_memory: 0,
384        })
385    }
386
387    pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
388        self.tokenizers.insert(field, tokenizer);
389    }
390
391    pub fn num_docs(&self) -> u32 {
392        self.next_doc_id
393    }
394
395    /// Fast O(1) memory estimate - updated incrementally during indexing
396    #[inline]
397    pub fn estimated_memory_bytes(&self) -> usize {
398        self.estimated_memory
399    }
400
401    /// Get current statistics for debugging performance (expensive - iterates all data)
402    pub fn stats(&self) -> SegmentBuilderStats {
403        use std::mem::size_of;
404
405        let postings_in_memory: usize =
406            self.inverted_index.values().map(|p| p.postings.len()).sum();
407
408        // Precise memory calculation using actual struct sizes
409        // CompactPosting: doc_id (u32) + term_freq (u16) = 6 bytes, but may have padding
410        let compact_posting_size = size_of::<CompactPosting>();
411
412        // Postings: actual Vec capacity * element size + Vec overhead (24 bytes on 64-bit)
413        let postings_bytes: usize = self
414            .inverted_index
415            .values()
416            .map(|p| {
417                p.postings.capacity() * compact_posting_size + size_of::<Vec<CompactPosting>>()
418            })
419            .sum();
420
421        // Inverted index: HashMap overhead per entry
422        // Each entry: TermKey (field u32 + Spur 4 bytes = 8 bytes) + PostingListBuilder + HashMap bucket overhead
423        // HashMap typically uses ~1.5x capacity, each bucket ~16-24 bytes
424        let term_key_size = size_of::<TermKey>();
425        let posting_builder_size = size_of::<PostingListBuilder>();
426        let hashmap_entry_overhead = 24; // bucket pointer + metadata
427        let index_overhead_bytes = self.inverted_index.len()
428            * (term_key_size + posting_builder_size + hashmap_entry_overhead);
429
430        // Term interner: Rodeo stores strings + metadata
431        // Each interned string: actual string bytes + Spur (4 bytes) + internal overhead (~16 bytes)
432        // We can't get exact string lengths, so estimate average term length of 8 bytes
433        let avg_term_len = 8;
434        let interner_overhead_per_string = size_of::<lasso::Spur>() + 16;
435        let interner_bytes =
436            self.term_interner.len() * (avg_term_len + interner_overhead_per_string);
437
438        // Doc field lengths: Vec<u32> with capacity
439        let field_lengths_bytes =
440            self.doc_field_lengths.capacity() * size_of::<u32>() + size_of::<Vec<u32>>();
441
442        // Dense vectors: actual capacity used
443        let mut dense_vectors_bytes: usize = 0;
444        let mut dense_vector_count: usize = 0;
445        for b in self.dense_vectors.values() {
446            // vectors: Vec<f32> capacity + doc_ids: Vec<DocId> capacity
447            dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
448                + b.doc_ids.capacity() * size_of::<DocId>()
449                + size_of::<Vec<f32>>()
450                + size_of::<Vec<DocId>>();
451            dense_vector_count += b.doc_ids.len();
452        }
453
454        // Local buffers
455        let local_tf_buffer_bytes =
456            self.local_tf_buffer.capacity() * (size_of::<lasso::Spur>() + size_of::<u32>() + 16);
457
458        // Sparse vectors: FxHashMap<u32, SparseVectorBuilder> where each builder has
459        // postings: FxHashMap<u32, Vec<(DocId, f32)>>
460        let mut sparse_vectors_bytes: usize = 0;
461        for builder in self.sparse_vectors.values() {
462            // Each SparseVectorBuilder has a FxHashMap<u32, Vec<(DocId, f32)>>
463            for postings in builder.postings.values() {
464                // Vec<(DocId, f32)> = Vec<(u32, f32)> = 8 bytes per entry
465                sparse_vectors_bytes += postings.capacity() * 8 + size_of::<Vec<(DocId, f32)>>();
466            }
467            // HashMap overhead: ~40 bytes per entry
468            sparse_vectors_bytes += builder.postings.len() * 40;
469        }
470        // Outer HashMap overhead
471        sparse_vectors_bytes += self.sparse_vectors.len() * 40;
472
473        // Position index: HashMap<TermKey, PositionPostingListBuilder>
474        // Each PositionPostingListBuilder has postings: Vec<(DocId, Vec<u32>)>
475        let mut position_index_bytes: usize = 0;
476        for pos_builder in self.position_index.values() {
477            for (_, positions) in &pos_builder.postings {
478                // Vec<u32> capacity
479                position_index_bytes +=
480                    positions.capacity() * size_of::<u32>() + size_of::<Vec<u32>>();
481            }
482            // Vec<(DocId, Vec<u32>)> overhead
483            position_index_bytes +=
484                pos_builder.postings.capacity() * (size_of::<DocId>() + size_of::<Vec<u32>>());
485        }
486        // HashMap overhead
487        position_index_bytes += self.position_index.len() * (size_of::<TermKey>() + 40);
488
489        let estimated_memory_bytes = postings_bytes
490            + index_overhead_bytes
491            + interner_bytes
492            + field_lengths_bytes
493            + dense_vectors_bytes
494            + local_tf_buffer_bytes
495            + sparse_vectors_bytes
496            + position_index_bytes;
497
498        let memory_breakdown = MemoryBreakdown {
499            postings_bytes,
500            index_overhead_bytes,
501            interner_bytes,
502            field_lengths_bytes,
503            dense_vectors_bytes,
504            dense_vector_count,
505            sparse_vectors_bytes,
506            position_index_bytes,
507        };
508
509        SegmentBuilderStats {
510            num_docs: self.next_doc_id,
511            unique_terms: self.inverted_index.len(),
512            postings_in_memory,
513            interned_strings: self.term_interner.len(),
514            doc_field_lengths_size: self.doc_field_lengths.len(),
515            estimated_memory_bytes,
516            memory_breakdown,
517        }
518    }
519
520    /// Add a document - streams to disk immediately
521    pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
522        let doc_id = self.next_doc_id;
523        self.next_doc_id += 1;
524
525        // Initialize field lengths for this document
526        let base_idx = self.doc_field_lengths.len();
527        self.doc_field_lengths
528            .resize(base_idx + self.num_indexed_fields, 0);
529
530        // Reset element ordinals for this document (for multi-valued fields)
531        self.current_element_ordinal.clear();
532
533        for (field, value) in doc.field_values() {
534            let entry = self.schema.get_field_entry(*field);
535            if entry.is_none() || !entry.unwrap().indexed {
536                continue;
537            }
538
539            let entry = entry.unwrap();
540            match (&entry.field_type, value) {
541                (FieldType::Text, FieldValue::Text(text)) => {
542                    // Get current element ordinal for multi-valued fields
543                    let element_ordinal = *self.current_element_ordinal.get(&field.0).unwrap_or(&0);
544                    let token_count =
545                        self.index_text_field(*field, doc_id, text, element_ordinal)?;
546                    // Increment element ordinal for next value of this field
547                    *self.current_element_ordinal.entry(field.0).or_insert(0) += 1;
548
549                    // Update field statistics
550                    let stats = self.field_stats.entry(field.0).or_default();
551                    stats.total_tokens += token_count as u64;
552                    stats.doc_count += 1;
553
554                    // Store field length compactly
555                    if let Some(&slot) = self.field_to_slot.get(&field.0) {
556                        self.doc_field_lengths[base_idx + slot] = token_count;
557                    }
558                }
559                (FieldType::U64, FieldValue::U64(v)) => {
560                    self.index_numeric_field(*field, doc_id, *v)?;
561                }
562                (FieldType::I64, FieldValue::I64(v)) => {
563                    self.index_numeric_field(*field, doc_id, *v as u64)?;
564                }
565                (FieldType::F64, FieldValue::F64(v)) => {
566                    self.index_numeric_field(*field, doc_id, v.to_bits())?;
567                }
568                (FieldType::DenseVector, FieldValue::DenseVector(vec)) => {
569                    self.index_dense_vector_field(*field, doc_id, vec)?;
570                }
571                (FieldType::SparseVector, FieldValue::SparseVector(entries)) => {
572                    self.index_sparse_vector_field(*field, doc_id, entries)?;
573                }
574                _ => {}
575            }
576        }
577
578        // Stream document to disk immediately
579        self.write_document_to_store(&doc)?;
580
581        Ok(doc_id)
582    }
583
584    /// Index a text field using interned terms
585    ///
586    /// Optimization: Zero-allocation inline tokenization + term frequency aggregation.
587    /// Instead of allocating a String per token, we:
588    /// 1. Iterate over whitespace-split words
589    /// 2. Build lowercase token in a reusable buffer
590    /// 3. Intern directly from the buffer
591    ///
592    /// If position recording is enabled for this field, also records token positions
593    /// encoded as (element_ordinal << 20) | token_position.
594    fn index_text_field(
595        &mut self,
596        field: Field,
597        doc_id: DocId,
598        text: &str,
599        element_ordinal: u32,
600    ) -> Result<u32> {
601        use crate::dsl::PositionMode;
602
603        let field_id = field.0;
604        let position_mode = self
605            .position_enabled_fields
606            .get(&field_id)
607            .copied()
608            .flatten();
609
610        // Phase 1: Aggregate term frequencies within this document
611        // Also collect positions if enabled
612        // Reuse buffer to avoid allocations
613        self.local_tf_buffer.clear();
614
615        // For position tracking: term -> list of positions in this text
616        let mut local_positions: FxHashMap<Spur, Vec<u32>> = FxHashMap::default();
617
618        let mut token_position = 0u32;
619
620        // Zero-allocation tokenization: iterate words, lowercase inline, intern directly
621        for word in text.split_whitespace() {
622            // Build lowercase token in reusable buffer
623            self.token_buffer.clear();
624            for c in word.chars() {
625                if c.is_alphanumeric() {
626                    for lc in c.to_lowercase() {
627                        self.token_buffer.push(lc);
628                    }
629                }
630            }
631
632            if self.token_buffer.is_empty() {
633                continue;
634            }
635
636            // Intern the term directly from buffer - O(1) amortized
637            let term_spur = self.term_interner.get_or_intern(&self.token_buffer);
638            *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
639
640            // Record position based on mode
641            if let Some(mode) = position_mode {
642                let encoded_pos = match mode {
643                    // Ordinal only: just store element ordinal (token position = 0)
644                    PositionMode::Ordinal => element_ordinal << 20,
645                    // Token position only: just store token position (ordinal = 0)
646                    PositionMode::TokenPosition => token_position,
647                    // Full: encode both
648                    PositionMode::Full => (element_ordinal << 20) | token_position,
649                };
650                local_positions
651                    .entry(term_spur)
652                    .or_default()
653                    .push(encoded_pos);
654            }
655
656            token_position += 1;
657        }
658
659        // Phase 2: Insert aggregated terms into inverted index
660        // Now we only do one inverted_index lookup per unique term in doc
661        for (&term_spur, &tf) in &self.local_tf_buffer {
662            let term_key = TermKey {
663                field: field_id,
664                term: term_spur,
665            };
666
667            let posting = self
668                .inverted_index
669                .entry(term_key)
670                .or_insert_with(PostingListBuilder::new);
671            posting.add(doc_id, tf);
672
673            // Add positions if enabled
674            if position_mode.is_some()
675                && let Some(positions) = local_positions.get(&term_spur)
676            {
677                let pos_posting = self
678                    .position_index
679                    .entry(term_key)
680                    .or_insert_with(PositionPostingListBuilder::new);
681                for &pos in positions {
682                    pos_posting.add_position(doc_id, pos);
683                }
684            }
685        }
686
687        Ok(token_position)
688    }
689
690    fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
691        // For numeric fields, we use a special encoding
692        let term_str = format!("__num_{}", value);
693        let term_spur = self.term_interner.get_or_intern(&term_str);
694
695        let term_key = TermKey {
696            field: field.0,
697            term: term_spur,
698        };
699
700        let posting = self
701            .inverted_index
702            .entry(term_key)
703            .or_insert_with(PostingListBuilder::new);
704        posting.add(doc_id, 1);
705
706        Ok(())
707    }
708
709    /// Index a dense vector field
710    fn index_dense_vector_field(
711        &mut self,
712        field: Field,
713        doc_id: DocId,
714        vector: &[f32],
715    ) -> Result<()> {
716        let dim = vector.len();
717
718        let builder = self
719            .dense_vectors
720            .entry(field.0)
721            .or_insert_with(|| DenseVectorBuilder::new(dim));
722
723        // Verify dimension consistency
724        if builder.dim != dim && builder.len() > 0 {
725            return Err(crate::Error::Schema(format!(
726                "Dense vector dimension mismatch: expected {}, got {}",
727                builder.dim, dim
728            )));
729        }
730
731        builder.add(doc_id, vector);
732        Ok(())
733    }
734
735    /// Index a sparse vector field using dedicated sparse posting lists
736    ///
737    /// Collects (doc_id, weight) postings per dimension. During commit, these are
738    /// converted to BlockSparsePostingList with proper quantization from SparseVectorConfig.
739    ///
740    /// Weights below the configured `weight_threshold` are not indexed.
741    fn index_sparse_vector_field(
742        &mut self,
743        field: Field,
744        doc_id: DocId,
745        entries: &[(u32, f32)],
746    ) -> Result<()> {
747        // Get weight threshold from field config (default 0.0 = no filtering)
748        let weight_threshold = self
749            .schema
750            .get_field_entry(field)
751            .and_then(|entry| entry.sparse_vector_config.as_ref())
752            .map(|config| config.weight_threshold)
753            .unwrap_or(0.0);
754
755        let builder = self
756            .sparse_vectors
757            .entry(field.0)
758            .or_insert_with(SparseVectorBuilder::new);
759
760        for &(dim_id, weight) in entries {
761            // Skip weights below threshold
762            if weight.abs() < weight_threshold {
763                continue;
764            }
765
766            builder.add(dim_id, doc_id, weight);
767        }
768
769        Ok(())
770    }
771
772    /// Write document to streaming store
773    fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
774        use byteorder::{LittleEndian, WriteBytesExt};
775
776        let doc_bytes = super::store::serialize_document(doc, &self.schema)?;
777
778        self.store_file
779            .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
780        self.store_file.write_all(&doc_bytes)?;
781
782        Ok(())
783    }
784
785    /// Build the final segment
786    pub async fn build<D: Directory + DirectoryWriter>(
787        mut self,
788        dir: &D,
789        segment_id: SegmentId,
790    ) -> Result<SegmentMeta> {
791        // Flush any buffered data
792        self.store_file.flush()?;
793
794        let files = SegmentFiles::new(segment_id.0);
795
796        // Build positions FIRST to get offsets for TermInfo
797        let (positions_data, position_offsets) = self.build_positions_file()?;
798
799        // Extract data needed for parallel processing
800        let store_path = self.store_path.clone();
801        let schema = self.schema.clone();
802        let num_compression_threads = self.config.num_compression_threads;
803        let compression_level = self.config.compression_level;
804
805        // Build postings and document store in parallel
806        let (postings_result, store_result) = rayon::join(
807            || self.build_postings(&position_offsets),
808            || {
809                Self::build_store_parallel(
810                    &store_path,
811                    &schema,
812                    num_compression_threads,
813                    compression_level,
814                )
815            },
816        );
817
818        let (term_dict_data, postings_data) = postings_result?;
819        let store_data = store_result?;
820
821        // Write to directory
822        dir.write(&files.term_dict, &term_dict_data).await?;
823        dir.write(&files.postings, &postings_data).await?;
824        dir.write(&files.store, &store_data).await?;
825
826        // Write positions file (data only, offsets are in TermInfo)
827        if !positions_data.is_empty() {
828            dir.write(&files.positions, &positions_data).await?;
829        }
830
831        // Build and write dense vector indexes (RaBitQ) - all in one file
832        if !self.dense_vectors.is_empty() {
833            let vectors_data = self.build_vectors_file()?;
834            if !vectors_data.is_empty() {
835                dir.write(&files.vectors, &vectors_data).await?;
836            }
837        }
838
839        // Build and write sparse vector posting lists
840        if !self.sparse_vectors.is_empty() {
841            let sparse_data = self.build_sparse_file()?;
842            if !sparse_data.is_empty() {
843                dir.write(&files.sparse, &sparse_data).await?;
844            }
845        }
846
847        let meta = SegmentMeta {
848            id: segment_id.0,
849            num_docs: self.next_doc_id,
850            field_stats: self.field_stats.clone(),
851        };
852
853        dir.write(&files.meta, &meta.serialize()?).await?;
854
855        // Cleanup temp files
856        let _ = std::fs::remove_file(&self.store_path);
857
858        Ok(meta)
859    }
860
861    /// Build unified vectors file containing all dense vector indexes
862    ///
863    /// File format:
864    /// - Header: num_fields (u32)
865    /// - For each field: field_id (u32), index_type (u8), offset (u64), length (u64)
866    /// - Data: concatenated serialized indexes (RaBitQ, IVF-RaBitQ, or ScaNN)
867    fn build_vectors_file(&self) -> Result<Vec<u8>> {
868        use byteorder::{LittleEndian, WriteBytesExt};
869
870        // Build all indexes first: (field_id, index_type, data)
871        let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
872
873        for (&field_id, builder) in &self.dense_vectors {
874            if builder.len() == 0 {
875                continue;
876            }
877
878            let field = crate::dsl::Field(field_id);
879
880            // Get dense vector config
881            let dense_config = self
882                .schema
883                .get_field_entry(field)
884                .and_then(|e| e.dense_vector_config.as_ref());
885
886            // Get vectors, potentially trimmed for matryoshka/MRL indexing
887            let index_dim = dense_config.map(|c| c.index_dim()).unwrap_or(builder.dim);
888            let vectors = if index_dim < builder.dim {
889                // Trim vectors to mrl_dim for indexing
890                builder.get_vectors_trimmed(index_dim)
891            } else {
892                builder.get_vectors()
893            };
894
895            // During normal indexing, segments always store Flat (raw vectors).
896            // ANN indexes are built at index-level via build_vector_index() which
897            // trains centroids/codebooks once from all vectors and triggers rebuild.
898            let flat_data = FlatVectorData {
899                dim: index_dim,
900                vectors: vectors.clone(),
901                doc_ids: builder.doc_ids.clone(),
902            };
903            let index_bytes = serde_json::to_vec(&flat_data)
904                .map_err(|e| crate::Error::Serialization(e.to_string()))?;
905            let index_type = 3u8; // 3 = Flat
906
907            field_indexes.push((field_id, index_type, index_bytes));
908        }
909
910        if field_indexes.is_empty() {
911            return Ok(Vec::new());
912        }
913
914        // Sort by field_id for consistent ordering
915        field_indexes.sort_by_key(|(id, _, _)| *id);
916
917        // Calculate header size: num_fields + (field_id, index_type, offset, len) per field
918        let header_size = 4 + field_indexes.len() * (4 + 1 + 8 + 8);
919
920        // Build output
921        let mut output = Vec::new();
922
923        // Write number of fields
924        output.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
925
926        // Calculate offsets and write header entries
927        let mut current_offset = header_size as u64;
928        for (field_id, index_type, data) in &field_indexes {
929            output.write_u32::<LittleEndian>(*field_id)?;
930            output.write_u8(*index_type)?;
931            output.write_u64::<LittleEndian>(current_offset)?;
932            output.write_u64::<LittleEndian>(data.len() as u64)?;
933            current_offset += data.len() as u64;
934        }
935
936        // Write data
937        for (_, _, data) in field_indexes {
938            output.extend_from_slice(&data);
939        }
940
941        Ok(output)
942    }
943
944    /// Build sparse vectors file containing BlockSparsePostingList per field/dimension
945    ///
946    /// File format (direct-indexed table for O(1) dimension lookup):
947    /// - Header: num_fields (u32)
948    /// - For each field:
949    ///   - field_id (u32)
950    ///   - quantization (u8)
951    ///   - max_dim_id (u32)          ← highest dimension ID + 1 (table size)
952    ///   - table: [(offset: u64, length: u32)] × max_dim_id  ← direct indexed by dim_id
953    ///     (offset=0, length=0 means dimension not present)
954    /// - Data: concatenated serialized BlockSparsePostingList
955    fn build_sparse_file(&self) -> Result<Vec<u8>> {
956        use crate::structures::{BlockSparsePostingList, WeightQuantization};
957        use byteorder::{LittleEndian, WriteBytesExt};
958
959        if self.sparse_vectors.is_empty() {
960            return Ok(Vec::new());
961        }
962
963        // Collect field data: (field_id, quantization, max_dim_id, dim_id -> serialized_bytes)
964        type SparseFieldData = (u32, WeightQuantization, u32, FxHashMap<u32, Vec<u8>>);
965        let mut field_data: Vec<SparseFieldData> = Vec::new();
966
967        for (&field_id, builder) in &self.sparse_vectors {
968            if builder.is_empty() {
969                continue;
970            }
971
972            let field = crate::dsl::Field(field_id);
973
974            // Get quantization from field config
975            let quantization = self
976                .schema
977                .get_field_entry(field)
978                .and_then(|e| e.sparse_vector_config.as_ref())
979                .map(|c| c.weight_quantization)
980                .unwrap_or(WeightQuantization::Float32);
981
982            // Find max dimension ID
983            let max_dim_id = builder.postings.keys().max().copied().unwrap_or(0);
984
985            // Build BlockSparsePostingList for each dimension
986            let mut dim_bytes: FxHashMap<u32, Vec<u8>> = FxHashMap::default();
987
988            for (&dim_id, postings) in &builder.postings {
989                // Sort postings by doc_id
990                let mut sorted_postings = postings.clone();
991                sorted_postings.sort_by_key(|(doc_id, _)| *doc_id);
992
993                // Build BlockSparsePostingList
994                let block_list =
995                    BlockSparsePostingList::from_postings(&sorted_postings, quantization)
996                        .map_err(crate::Error::Io)?;
997
998                // Serialize
999                let mut bytes = Vec::new();
1000                block_list.serialize(&mut bytes).map_err(crate::Error::Io)?;
1001
1002                dim_bytes.insert(dim_id, bytes);
1003            }
1004
1005            field_data.push((field_id, quantization, max_dim_id + 1, dim_bytes));
1006        }
1007
1008        if field_data.is_empty() {
1009            return Ok(Vec::new());
1010        }
1011
1012        // Sort by field_id
1013        field_data.sort_by_key(|(id, _, _, _)| *id);
1014
1015        // Calculate header size
1016        // Header: num_fields (4)
1017        // Per field: field_id (4) + quant (1) + max_dim_id (4) + table (12 * max_dim_id)
1018        let mut header_size = 4u64;
1019        for (_, _, max_dim_id, _) in &field_data {
1020            header_size += 4 + 1 + 4; // field_id + quant + max_dim_id
1021            header_size += (*max_dim_id as u64) * 12; // table entries: (offset: u64, length: u32)
1022        }
1023
1024        // Build output
1025        let mut output = Vec::new();
1026
1027        // Write num_fields
1028        output.write_u32::<LittleEndian>(field_data.len() as u32)?;
1029
1030        // Track current data offset (after all headers)
1031        let mut current_offset = header_size;
1032
1033        // First, collect all data bytes in order and build offset tables
1034        let mut all_data: Vec<u8> = Vec::new();
1035        let mut field_tables: Vec<Vec<(u64, u32)>> = Vec::new();
1036
1037        for (_, _, max_dim_id, dim_bytes) in &field_data {
1038            let mut table: Vec<(u64, u32)> = vec![(0, 0); *max_dim_id as usize];
1039
1040            // Process dimensions in order
1041            for dim_id in 0..*max_dim_id {
1042                if let Some(bytes) = dim_bytes.get(&dim_id) {
1043                    table[dim_id as usize] = (current_offset, bytes.len() as u32);
1044                    current_offset += bytes.len() as u64;
1045                    all_data.extend_from_slice(bytes);
1046                }
1047                // else: table entry stays (0, 0) meaning dimension not present
1048            }
1049
1050            field_tables.push(table);
1051        }
1052
1053        // Write field headers and tables
1054        for (i, (field_id, quantization, max_dim_id, _)) in field_data.iter().enumerate() {
1055            output.write_u32::<LittleEndian>(*field_id)?;
1056            output.write_u8(*quantization as u8)?;
1057            output.write_u32::<LittleEndian>(*max_dim_id)?;
1058
1059            // Write table (direct indexed by dim_id)
1060            for &(offset, length) in &field_tables[i] {
1061                output.write_u64::<LittleEndian>(offset)?;
1062                output.write_u32::<LittleEndian>(length)?;
1063            }
1064        }
1065
1066        // Write data
1067        output.extend_from_slice(&all_data);
1068
1069        Ok(output)
1070    }
1071
1072    /// Build positions file for phrase queries
1073    ///
1074    /// File format:
1075    /// - Data only: concatenated serialized PositionPostingList
1076    /// - Position offsets are stored in TermInfo (no separate header needed)
1077    ///
1078    /// Returns: (positions_data, term_key -> (offset, len) mapping)
1079    #[allow(clippy::type_complexity)]
1080    fn build_positions_file(&self) -> Result<(Vec<u8>, FxHashMap<Vec<u8>, (u64, u32)>)> {
1081        use crate::structures::PositionPostingList;
1082
1083        let mut position_offsets: FxHashMap<Vec<u8>, (u64, u32)> = FxHashMap::default();
1084
1085        if self.position_index.is_empty() {
1086            return Ok((Vec::new(), position_offsets));
1087        }
1088
1089        // Collect and sort entries by key
1090        let mut entries: Vec<(Vec<u8>, &PositionPostingListBuilder)> = self
1091            .position_index
1092            .iter()
1093            .map(|(term_key, pos_list)| {
1094                let term_str = self.term_interner.resolve(&term_key.term);
1095                let mut key = Vec::with_capacity(4 + term_str.len());
1096                key.extend_from_slice(&term_key.field.to_le_bytes());
1097                key.extend_from_slice(term_str.as_bytes());
1098                (key, pos_list)
1099            })
1100            .collect();
1101
1102        entries.sort_by(|a, b| a.0.cmp(&b.0));
1103
1104        // Serialize all position lists and track offsets
1105        let mut output = Vec::new();
1106
1107        for (key, pos_builder) in entries {
1108            // Convert builder to PositionPostingList
1109            let mut pos_list = PositionPostingList::with_capacity(pos_builder.postings.len());
1110            for (doc_id, positions) in &pos_builder.postings {
1111                pos_list.push(*doc_id, positions.clone());
1112            }
1113
1114            // Serialize and track offset
1115            let offset = output.len() as u64;
1116            pos_list.serialize(&mut output).map_err(crate::Error::Io)?;
1117            let len = (output.len() as u64 - offset) as u32;
1118
1119            position_offsets.insert(key, (offset, len));
1120        }
1121
1122        Ok((output, position_offsets))
1123    }
1124
1125    /// Build postings from inverted index
1126    ///
1127    /// Uses parallel processing to serialize posting lists concurrently.
1128    /// Position offsets are looked up and embedded in TermInfo.
1129    fn build_postings(
1130        &mut self,
1131        position_offsets: &FxHashMap<Vec<u8>, (u64, u32)>,
1132    ) -> Result<(Vec<u8>, Vec<u8>)> {
1133        // Phase 1: Collect and sort term keys (parallel key generation)
1134        // Key format: field_id (4 bytes) + term bytes
1135        let mut term_entries: Vec<(Vec<u8>, &PostingListBuilder)> = self
1136            .inverted_index
1137            .iter()
1138            .map(|(term_key, posting_list)| {
1139                let term_str = self.term_interner.resolve(&term_key.term);
1140                let mut key = Vec::with_capacity(4 + term_str.len());
1141                key.extend_from_slice(&term_key.field.to_le_bytes());
1142                key.extend_from_slice(term_str.as_bytes());
1143                (key, posting_list)
1144            })
1145            .collect();
1146
1147        // Sort by key for SSTable ordering
1148        term_entries.par_sort_unstable_by(|a, b| a.0.cmp(&b.0));
1149
1150        // Phase 2: Parallel serialization of posting lists
1151        // Each term's posting list is serialized independently
1152        let serialized: Vec<(Vec<u8>, SerializedPosting)> = term_entries
1153            .into_par_iter()
1154            .map(|(key, posting_builder)| {
1155                // Build posting list from in-memory postings
1156                let mut full_postings = PostingList::with_capacity(posting_builder.len());
1157                for p in &posting_builder.postings {
1158                    full_postings.push(p.doc_id, p.term_freq as u32);
1159                }
1160
1161                // Build term info
1162                let doc_ids: Vec<u32> = full_postings.iter().map(|p| p.doc_id).collect();
1163                let term_freqs: Vec<u32> = full_postings.iter().map(|p| p.term_freq).collect();
1164
1165                // Don't inline if term has positions (inline format doesn't support position offsets)
1166                let has_positions = position_offsets.contains_key(&key);
1167                let result = if !has_positions
1168                    && let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs)
1169                {
1170                    SerializedPosting::Inline(inline)
1171                } else {
1172                    // Serialize to local buffer
1173                    let mut posting_bytes = Vec::new();
1174                    let block_list =
1175                        crate::structures::BlockPostingList::from_posting_list(&full_postings)
1176                            .expect("BlockPostingList creation failed");
1177                    block_list
1178                        .serialize(&mut posting_bytes)
1179                        .expect("BlockPostingList serialization failed");
1180                    SerializedPosting::External {
1181                        bytes: posting_bytes,
1182                        doc_count: full_postings.doc_count(),
1183                    }
1184                };
1185
1186                (key, result)
1187            })
1188            .collect();
1189
1190        // Phase 3: Sequential assembly (must be sequential for offset calculation)
1191        let mut term_dict = Vec::new();
1192        let mut postings = Vec::new();
1193        let mut writer = SSTableWriter::<TermInfo>::new(&mut term_dict);
1194
1195        for (key, serialized_posting) in serialized {
1196            let term_info = match serialized_posting {
1197                SerializedPosting::Inline(info) => info,
1198                SerializedPosting::External { bytes, doc_count } => {
1199                    let posting_offset = postings.len() as u64;
1200                    let posting_len = bytes.len() as u32;
1201                    postings.extend_from_slice(&bytes);
1202
1203                    // Look up position offset for this term
1204                    if let Some(&(pos_offset, pos_len)) = position_offsets.get(&key) {
1205                        TermInfo::external_with_positions(
1206                            posting_offset,
1207                            posting_len,
1208                            doc_count,
1209                            pos_offset,
1210                            pos_len,
1211                        )
1212                    } else {
1213                        TermInfo::external(posting_offset, posting_len, doc_count)
1214                    }
1215                }
1216            };
1217
1218            writer.insert(&key, &term_info)?;
1219        }
1220
1221        writer.finish()?;
1222        Ok((term_dict, postings))
1223    }
1224
1225    /// Build document store from streamed temp file (static method for parallel execution)
1226    ///
1227    /// Uses parallel processing to deserialize documents concurrently.
1228    fn build_store_parallel(
1229        store_path: &PathBuf,
1230        schema: &Schema,
1231        num_compression_threads: usize,
1232        compression_level: CompressionLevel,
1233    ) -> Result<Vec<u8>> {
1234        use super::store::EagerParallelStoreWriter;
1235
1236        let file = File::open(store_path)?;
1237        let mmap = unsafe { memmap2::Mmap::map(&file)? };
1238
1239        // Phase 1: Parse document boundaries (sequential, fast)
1240        let mut doc_ranges: Vec<(usize, usize)> = Vec::new();
1241        let mut offset = 0usize;
1242        while offset + 4 <= mmap.len() {
1243            let doc_len = u32::from_le_bytes([
1244                mmap[offset],
1245                mmap[offset + 1],
1246                mmap[offset + 2],
1247                mmap[offset + 3],
1248            ]) as usize;
1249            offset += 4;
1250
1251            if offset + doc_len > mmap.len() {
1252                break;
1253            }
1254
1255            doc_ranges.push((offset, doc_len));
1256            offset += doc_len;
1257        }
1258
1259        // Phase 2: Parallel deserialization of documents
1260        let docs: Vec<Document> = doc_ranges
1261            .into_par_iter()
1262            .filter_map(|(start, len)| {
1263                let doc_bytes = &mmap[start..start + len];
1264                super::store::deserialize_document(doc_bytes, schema).ok()
1265            })
1266            .collect();
1267
1268        // Phase 3: Write to store (compression is already parallel in EagerParallelStoreWriter)
1269        let mut store_data = Vec::new();
1270        let mut store_writer = EagerParallelStoreWriter::with_compression_level(
1271            &mut store_data,
1272            num_compression_threads,
1273            compression_level,
1274        );
1275
1276        for doc in &docs {
1277            store_writer.store(doc, schema)?;
1278        }
1279
1280        store_writer.finish()?;
1281        Ok(store_data)
1282    }
1283}
1284
1285impl Drop for SegmentBuilder {
1286    fn drop(&mut self) {
1287        // Cleanup temp files on drop
1288        let _ = std::fs::remove_file(&self.store_path);
1289    }
1290}