Skip to main content

hermes_core/segment/
builder.rs

1//! Streaming segment builder with optimized memory usage
2//!
3//! Key optimizations:
4//! - **String interning**: Terms are interned using `lasso` to avoid repeated allocations
5//! - **hashbrown HashMap**: O(1) average insertion instead of BTreeMap's O(log n)
6//! - **Streaming document store**: Documents written to disk immediately
7//! - **Incremental posting flush**: Large posting lists flushed to temp file
8//! - **Memory-mapped intermediate files**: Reduces memory pressure
9//! - **Arena allocation**: Batch allocations for reduced fragmentation
10
11use std::fs::{File, OpenOptions};
12use std::io::{BufWriter, Write};
13use std::path::PathBuf;
14
15use hashbrown::HashMap;
16use lasso::{Rodeo, Spur};
17use rayon::prelude::*;
18use rustc_hash::FxHashMap;
19
20use serde::{Deserialize, Serialize};
21
22use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
23use crate::compression::CompressionLevel;
24use crate::directories::{Directory, DirectoryWriter};
25use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
26use crate::structures::{PostingList, SSTableWriter, TermInfo};
27use crate::tokenizer::BoxedTokenizer;
28use crate::{DocId, Result};
29
30/// Flat vector data for brute-force search (accumulating state)
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct FlatVectorData {
33    pub dim: usize,
34    pub vectors: Vec<Vec<f32>>,
35    pub doc_ids: Vec<u32>,
36}
37
38/// IVF-RaBitQ index data with embedded centroids and codebook
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct IVFRaBitQIndexData {
41    pub index: crate::structures::IVFRaBitQIndex,
42    pub centroids: crate::structures::CoarseCentroids,
43    pub codebook: crate::structures::RaBitQCodebook,
44}
45
46impl IVFRaBitQIndexData {
47    pub fn to_bytes(&self) -> std::io::Result<Vec<u8>> {
48        serde_json::to_vec(self)
49            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
50    }
51
52    pub fn from_bytes(data: &[u8]) -> std::io::Result<Self> {
53        serde_json::from_slice(data)
54            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
55    }
56}
57
58/// ScaNN index data with embedded centroids and codebook
59#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct ScaNNIndexData {
61    pub index: crate::structures::IVFPQIndex,
62    pub centroids: crate::structures::CoarseCentroids,
63    pub codebook: crate::structures::PQCodebook,
64}
65
66impl ScaNNIndexData {
67    pub fn to_bytes(&self) -> std::io::Result<Vec<u8>> {
68        serde_json::to_vec(self)
69            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
70    }
71
72    pub fn from_bytes(data: &[u8]) -> std::io::Result<Self> {
73        serde_json::from_slice(data)
74            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
75    }
76}
77
78/// Size of the document store buffer before writing to disk
79const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; // 16MB
80
81/// Interned term key combining field and term
82#[derive(Clone, Copy, PartialEq, Eq, Hash)]
83struct TermKey {
84    field: u32,
85    term: Spur,
86}
87
88/// Compact posting entry for in-memory storage
89#[derive(Clone, Copy)]
90struct CompactPosting {
91    doc_id: DocId,
92    term_freq: u16,
93}
94
95/// In-memory posting list for a term
96struct PostingListBuilder {
97    /// In-memory postings
98    postings: Vec<CompactPosting>,
99}
100
101impl PostingListBuilder {
102    fn new() -> Self {
103        Self {
104            postings: Vec::new(),
105        }
106    }
107
108    /// Add a posting, merging if same doc_id as last
109    #[inline]
110    fn add(&mut self, doc_id: DocId, term_freq: u32) {
111        // Check if we can merge with the last posting
112        if let Some(last) = self.postings.last_mut()
113            && last.doc_id == doc_id
114        {
115            last.term_freq = last.term_freq.saturating_add(term_freq as u16);
116            return;
117        }
118        self.postings.push(CompactPosting {
119            doc_id,
120            term_freq: term_freq.min(u16::MAX as u32) as u16,
121        });
122    }
123
124    fn len(&self) -> usize {
125        self.postings.len()
126    }
127}
128
129/// In-memory position posting list for a term (for fields with record_positions=true)
130struct PositionPostingListBuilder {
131    /// Doc ID -> list of positions (encoded as element_ordinal << 20 | token_position)
132    postings: Vec<(DocId, Vec<u32>)>,
133}
134
135impl PositionPostingListBuilder {
136    fn new() -> Self {
137        Self {
138            postings: Vec::new(),
139        }
140    }
141
142    /// Add a position for a document
143    #[inline]
144    fn add_position(&mut self, doc_id: DocId, position: u32) {
145        if let Some((last_doc, positions)) = self.postings.last_mut()
146            && *last_doc == doc_id
147        {
148            positions.push(position);
149            return;
150        }
151        self.postings.push((doc_id, vec![position]));
152    }
153}
154
155/// Intermediate result for parallel posting serialization
156enum SerializedPosting {
157    /// Inline posting (small enough to fit in TermInfo)
158    Inline(TermInfo),
159    /// External posting with serialized bytes
160    External { bytes: Vec<u8>, doc_count: u32 },
161}
162
163/// Statistics for debugging segment builder performance
164#[derive(Debug, Clone)]
165pub struct SegmentBuilderStats {
166    /// Number of documents indexed
167    pub num_docs: u32,
168    /// Number of unique terms in the inverted index
169    pub unique_terms: usize,
170    /// Total postings in memory (across all terms)
171    pub postings_in_memory: usize,
172    /// Number of interned strings
173    pub interned_strings: usize,
174    /// Size of doc_field_lengths vector
175    pub doc_field_lengths_size: usize,
176    /// Estimated total memory usage in bytes
177    pub estimated_memory_bytes: usize,
178    /// Memory breakdown by component
179    pub memory_breakdown: MemoryBreakdown,
180}
181
182/// Detailed memory breakdown by component
183#[derive(Debug, Clone, Default)]
184pub struct MemoryBreakdown {
185    /// Postings memory (CompactPosting structs)
186    pub postings_bytes: usize,
187    /// Inverted index HashMap overhead
188    pub index_overhead_bytes: usize,
189    /// Term interner memory
190    pub interner_bytes: usize,
191    /// Document field lengths
192    pub field_lengths_bytes: usize,
193    /// Dense vector storage
194    pub dense_vectors_bytes: usize,
195    /// Number of dense vectors
196    pub dense_vector_count: usize,
197}
198
199/// Configuration for segment builder
200#[derive(Clone)]
201pub struct SegmentBuilderConfig {
202    /// Directory for temporary spill files
203    pub temp_dir: PathBuf,
204    /// Compression level for document store
205    pub compression_level: CompressionLevel,
206    /// Number of threads for parallel compression
207    pub num_compression_threads: usize,
208    /// Initial capacity for term interner
209    pub interner_capacity: usize,
210    /// Initial capacity for posting lists hashmap
211    pub posting_map_capacity: usize,
212}
213
214impl Default for SegmentBuilderConfig {
215    fn default() -> Self {
216        Self {
217            temp_dir: std::env::temp_dir(),
218            compression_level: CompressionLevel(7),
219            num_compression_threads: num_cpus::get(),
220            interner_capacity: 1_000_000,
221            posting_map_capacity: 500_000,
222        }
223    }
224}
225
226/// Segment builder with optimized memory usage
227///
228/// Features:
229/// - Streams documents to disk immediately (no in-memory document storage)
230/// - Uses string interning for terms (reduced allocations)
231/// - Uses hashbrown HashMap (faster than BTreeMap)
232pub struct SegmentBuilder {
233    schema: Schema,
234    config: SegmentBuilderConfig,
235    tokenizers: FxHashMap<Field, BoxedTokenizer>,
236
237    /// String interner for terms - O(1) lookup and deduplication
238    term_interner: Rodeo,
239
240    /// Inverted index: term key -> posting list
241    inverted_index: HashMap<TermKey, PostingListBuilder>,
242
243    /// Streaming document store writer
244    store_file: BufWriter<File>,
245    store_path: PathBuf,
246
247    /// Document count
248    next_doc_id: DocId,
249
250    /// Per-field statistics for BM25F
251    field_stats: FxHashMap<u32, FieldStats>,
252
253    /// Per-document field lengths stored compactly
254    /// Uses a flat Vec instead of Vec<HashMap> for better cache locality
255    /// Layout: [doc0_field0_len, doc0_field1_len, ..., doc1_field0_len, ...]
256    doc_field_lengths: Vec<u32>,
257    num_indexed_fields: usize,
258    field_to_slot: FxHashMap<u32, usize>,
259
260    /// Reusable buffer for per-document term frequency aggregation
261    /// Avoids allocating a new hashmap for each document
262    local_tf_buffer: FxHashMap<Spur, u32>,
263
264    /// Reusable buffer for tokenization to avoid per-token String allocations
265    token_buffer: String,
266
267    /// Dense vector storage per field: field -> (doc_ids, vectors)
268    /// Vectors are stored as flat f32 arrays for efficient RaBitQ indexing
269    dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
270
271    /// Sparse vector storage per field: field -> SparseVectorBuilder
272    /// Uses proper BlockSparsePostingList with configurable quantization
273    sparse_vectors: FxHashMap<u32, SparseVectorBuilder>,
274
275    /// Position index for fields with positions enabled
276    /// term key -> position posting list
277    position_index: HashMap<TermKey, PositionPostingListBuilder>,
278
279    /// Fields that have position tracking enabled, with their mode
280    position_enabled_fields: FxHashMap<u32, Option<crate::dsl::PositionMode>>,
281
282    /// Current element ordinal for multi-valued fields (reset per document)
283    current_element_ordinal: FxHashMap<u32, u32>,
284}
285
286/// Builder for dense vector index using RaBitQ
287struct DenseVectorBuilder {
288    /// Dimension of vectors
289    dim: usize,
290    /// Document IDs with vectors
291    doc_ids: Vec<DocId>,
292    /// Flat vector storage (doc_ids.len() * dim floats)
293    vectors: Vec<f32>,
294}
295
296impl DenseVectorBuilder {
297    fn new(dim: usize) -> Self {
298        Self {
299            dim,
300            doc_ids: Vec::new(),
301            vectors: Vec::new(),
302        }
303    }
304
305    fn add(&mut self, doc_id: DocId, vector: &[f32]) {
306        debug_assert_eq!(vector.len(), self.dim, "Vector dimension mismatch");
307        self.doc_ids.push(doc_id);
308        self.vectors.extend_from_slice(vector);
309    }
310
311    fn len(&self) -> usize {
312        self.doc_ids.len()
313    }
314
315    /// Get all vectors as Vec<Vec<f32>> for RaBitQ indexing
316    fn get_vectors(&self) -> Vec<Vec<f32>> {
317        self.doc_ids
318            .iter()
319            .enumerate()
320            .map(|(i, _)| {
321                let start = i * self.dim;
322                self.vectors[start..start + self.dim].to_vec()
323            })
324            .collect()
325    }
326
327    /// Get vectors trimmed to specified dimension for matryoshka/MRL indexing
328    fn get_vectors_trimmed(&self, trim_dim: usize) -> Vec<Vec<f32>> {
329        debug_assert!(trim_dim <= self.dim, "trim_dim must be <= dim");
330        self.doc_ids
331            .iter()
332            .enumerate()
333            .map(|(i, _)| {
334                let start = i * self.dim;
335                self.vectors[start..start + trim_dim].to_vec()
336            })
337            .collect()
338    }
339}
340
341/// Builder for sparse vector index using BlockSparsePostingList
342///
343/// Collects (doc_id, weight) postings per dimension, then builds
344/// BlockSparsePostingList with proper quantization during commit.
345struct SparseVectorBuilder {
346    /// Postings per dimension: dim_id -> Vec<(doc_id, weight)>
347    postings: FxHashMap<u32, Vec<(DocId, f32)>>,
348}
349
350impl SparseVectorBuilder {
351    fn new() -> Self {
352        Self {
353            postings: FxHashMap::default(),
354        }
355    }
356
357    /// Add a sparse vector entry
358    #[inline]
359    fn add(&mut self, dim_id: u32, doc_id: DocId, weight: f32) {
360        self.postings
361            .entry(dim_id)
362            .or_default()
363            .push((doc_id, weight));
364    }
365
366    fn is_empty(&self) -> bool {
367        self.postings.is_empty()
368    }
369}
370
371impl SegmentBuilder {
372    /// Create a new segment builder
373    pub fn new(schema: Schema, config: SegmentBuilderConfig) -> Result<Self> {
374        let segment_id = uuid::Uuid::new_v4();
375        let store_path = config
376            .temp_dir
377            .join(format!("hermes_store_{}.tmp", segment_id));
378
379        let store_file = BufWriter::with_capacity(
380            STORE_BUFFER_SIZE,
381            OpenOptions::new()
382                .create(true)
383                .write(true)
384                .truncate(true)
385                .open(&store_path)?,
386        );
387
388        // Count indexed fields for compact field length storage
389        // Also track which fields have position recording enabled
390        let mut num_indexed_fields = 0;
391        let mut field_to_slot = FxHashMap::default();
392        let mut position_enabled_fields = FxHashMap::default();
393        for (field, entry) in schema.fields() {
394            if entry.indexed && matches!(entry.field_type, FieldType::Text) {
395                field_to_slot.insert(field.0, num_indexed_fields);
396                num_indexed_fields += 1;
397                if entry.positions.is_some() {
398                    position_enabled_fields.insert(field.0, entry.positions);
399                }
400            }
401        }
402
403        Ok(Self {
404            schema,
405            tokenizers: FxHashMap::default(),
406            term_interner: Rodeo::new(),
407            inverted_index: HashMap::with_capacity(config.posting_map_capacity),
408            store_file,
409            store_path,
410            next_doc_id: 0,
411            field_stats: FxHashMap::default(),
412            doc_field_lengths: Vec::new(),
413            num_indexed_fields,
414            field_to_slot,
415            local_tf_buffer: FxHashMap::default(),
416            token_buffer: String::with_capacity(64),
417            config,
418            dense_vectors: FxHashMap::default(),
419            sparse_vectors: FxHashMap::default(),
420            position_index: HashMap::new(),
421            position_enabled_fields,
422            current_element_ordinal: FxHashMap::default(),
423        })
424    }
425
426    pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
427        self.tokenizers.insert(field, tokenizer);
428    }
429
430    pub fn num_docs(&self) -> u32 {
431        self.next_doc_id
432    }
433
434    /// Get current statistics for debugging performance
435    pub fn stats(&self) -> SegmentBuilderStats {
436        use std::mem::size_of;
437
438        let postings_in_memory: usize =
439            self.inverted_index.values().map(|p| p.postings.len()).sum();
440
441        // Precise memory calculation using actual struct sizes
442        // CompactPosting: doc_id (u32) + term_freq (u16) = 6 bytes, but may have padding
443        let compact_posting_size = size_of::<CompactPosting>();
444
445        // Postings: actual Vec capacity * element size + Vec overhead (24 bytes on 64-bit)
446        let postings_bytes: usize = self
447            .inverted_index
448            .values()
449            .map(|p| {
450                p.postings.capacity() * compact_posting_size + size_of::<Vec<CompactPosting>>()
451            })
452            .sum();
453
454        // Inverted index: HashMap overhead per entry
455        // Each entry: TermKey (field u32 + Spur 4 bytes = 8 bytes) + PostingListBuilder + HashMap bucket overhead
456        // HashMap typically uses ~1.5x capacity, each bucket ~16-24 bytes
457        let term_key_size = size_of::<TermKey>();
458        let posting_builder_size = size_of::<PostingListBuilder>();
459        let hashmap_entry_overhead = 24; // bucket pointer + metadata
460        let index_overhead_bytes = self.inverted_index.len()
461            * (term_key_size + posting_builder_size + hashmap_entry_overhead);
462
463        // Term interner: Rodeo stores strings + metadata
464        // Each interned string: actual string bytes + Spur (4 bytes) + internal overhead (~16 bytes)
465        // We can't get exact string lengths, so estimate average term length of 8 bytes
466        let avg_term_len = 8;
467        let interner_overhead_per_string = size_of::<lasso::Spur>() + 16;
468        let interner_bytes =
469            self.term_interner.len() * (avg_term_len + interner_overhead_per_string);
470
471        // Doc field lengths: Vec<u32> with capacity
472        let field_lengths_bytes =
473            self.doc_field_lengths.capacity() * size_of::<u32>() + size_of::<Vec<u32>>();
474
475        // Dense vectors: actual capacity used
476        let mut dense_vectors_bytes: usize = 0;
477        let mut dense_vector_count: usize = 0;
478        for b in self.dense_vectors.values() {
479            // vectors: Vec<f32> capacity + doc_ids: Vec<DocId> capacity
480            dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
481                + b.doc_ids.capacity() * size_of::<DocId>()
482                + size_of::<Vec<f32>>()
483                + size_of::<Vec<DocId>>();
484            dense_vector_count += b.doc_ids.len();
485        }
486
487        // Local buffers
488        let local_tf_buffer_bytes =
489            self.local_tf_buffer.capacity() * (size_of::<lasso::Spur>() + size_of::<u32>() + 16);
490
491        let estimated_memory_bytes = postings_bytes
492            + index_overhead_bytes
493            + interner_bytes
494            + field_lengths_bytes
495            + dense_vectors_bytes
496            + local_tf_buffer_bytes;
497
498        let memory_breakdown = MemoryBreakdown {
499            postings_bytes,
500            index_overhead_bytes,
501            interner_bytes,
502            field_lengths_bytes,
503            dense_vectors_bytes,
504            dense_vector_count,
505        };
506
507        SegmentBuilderStats {
508            num_docs: self.next_doc_id,
509            unique_terms: self.inverted_index.len(),
510            postings_in_memory,
511            interned_strings: self.term_interner.len(),
512            doc_field_lengths_size: self.doc_field_lengths.len(),
513            estimated_memory_bytes,
514            memory_breakdown,
515        }
516    }
517
518    /// Add a document - streams to disk immediately
519    pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
520        let doc_id = self.next_doc_id;
521        self.next_doc_id += 1;
522
523        // Initialize field lengths for this document
524        let base_idx = self.doc_field_lengths.len();
525        self.doc_field_lengths
526            .resize(base_idx + self.num_indexed_fields, 0);
527
528        // Reset element ordinals for this document (for multi-valued fields)
529        self.current_element_ordinal.clear();
530
531        for (field, value) in doc.field_values() {
532            let entry = self.schema.get_field_entry(*field);
533            if entry.is_none() || !entry.unwrap().indexed {
534                continue;
535            }
536
537            let entry = entry.unwrap();
538            match (&entry.field_type, value) {
539                (FieldType::Text, FieldValue::Text(text)) => {
540                    // Get current element ordinal for multi-valued fields
541                    let element_ordinal = *self.current_element_ordinal.get(&field.0).unwrap_or(&0);
542                    let token_count =
543                        self.index_text_field(*field, doc_id, text, element_ordinal)?;
544                    // Increment element ordinal for next value of this field
545                    *self.current_element_ordinal.entry(field.0).or_insert(0) += 1;
546
547                    // Update field statistics
548                    let stats = self.field_stats.entry(field.0).or_default();
549                    stats.total_tokens += token_count as u64;
550                    stats.doc_count += 1;
551
552                    // Store field length compactly
553                    if let Some(&slot) = self.field_to_slot.get(&field.0) {
554                        self.doc_field_lengths[base_idx + slot] = token_count;
555                    }
556                }
557                (FieldType::U64, FieldValue::U64(v)) => {
558                    self.index_numeric_field(*field, doc_id, *v)?;
559                }
560                (FieldType::I64, FieldValue::I64(v)) => {
561                    self.index_numeric_field(*field, doc_id, *v as u64)?;
562                }
563                (FieldType::F64, FieldValue::F64(v)) => {
564                    self.index_numeric_field(*field, doc_id, v.to_bits())?;
565                }
566                (FieldType::DenseVector, FieldValue::DenseVector(vec)) => {
567                    self.index_dense_vector_field(*field, doc_id, vec)?;
568                }
569                (FieldType::SparseVector, FieldValue::SparseVector(entries)) => {
570                    self.index_sparse_vector_field(*field, doc_id, entries)?;
571                }
572                _ => {}
573            }
574        }
575
576        // Stream document to disk immediately
577        self.write_document_to_store(&doc)?;
578
579        Ok(doc_id)
580    }
581
582    /// Index a text field using interned terms
583    ///
584    /// Optimization: Zero-allocation inline tokenization + term frequency aggregation.
585    /// Instead of allocating a String per token, we:
586    /// 1. Iterate over whitespace-split words
587    /// 2. Build lowercase token in a reusable buffer
588    /// 3. Intern directly from the buffer
589    ///
590    /// If position recording is enabled for this field, also records token positions
591    /// encoded as (element_ordinal << 20) | token_position.
592    fn index_text_field(
593        &mut self,
594        field: Field,
595        doc_id: DocId,
596        text: &str,
597        element_ordinal: u32,
598    ) -> Result<u32> {
599        use crate::dsl::PositionMode;
600
601        let field_id = field.0;
602        let position_mode = self
603            .position_enabled_fields
604            .get(&field_id)
605            .copied()
606            .flatten();
607
608        // Phase 1: Aggregate term frequencies within this document
609        // Also collect positions if enabled
610        // Reuse buffer to avoid allocations
611        self.local_tf_buffer.clear();
612
613        // For position tracking: term -> list of positions in this text
614        let mut local_positions: FxHashMap<Spur, Vec<u32>> = FxHashMap::default();
615
616        let mut token_position = 0u32;
617
618        // Zero-allocation tokenization: iterate words, lowercase inline, intern directly
619        for word in text.split_whitespace() {
620            // Build lowercase token in reusable buffer
621            self.token_buffer.clear();
622            for c in word.chars() {
623                if c.is_alphanumeric() {
624                    for lc in c.to_lowercase() {
625                        self.token_buffer.push(lc);
626                    }
627                }
628            }
629
630            if self.token_buffer.is_empty() {
631                continue;
632            }
633
634            // Intern the term directly from buffer - O(1) amortized
635            let term_spur = self.term_interner.get_or_intern(&self.token_buffer);
636            *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
637
638            // Record position based on mode
639            if let Some(mode) = position_mode {
640                let encoded_pos = match mode {
641                    // Ordinal only: just store element ordinal (token position = 0)
642                    PositionMode::Ordinal => element_ordinal << 20,
643                    // Token position only: just store token position (ordinal = 0)
644                    PositionMode::TokenPosition => token_position,
645                    // Full: encode both
646                    PositionMode::Full => (element_ordinal << 20) | token_position,
647                };
648                local_positions
649                    .entry(term_spur)
650                    .or_default()
651                    .push(encoded_pos);
652            }
653
654            token_position += 1;
655        }
656
657        // Phase 2: Insert aggregated terms into inverted index
658        // Now we only do one inverted_index lookup per unique term in doc
659        for (&term_spur, &tf) in &self.local_tf_buffer {
660            let term_key = TermKey {
661                field: field_id,
662                term: term_spur,
663            };
664
665            let posting = self
666                .inverted_index
667                .entry(term_key)
668                .or_insert_with(PostingListBuilder::new);
669            posting.add(doc_id, tf);
670
671            // Add positions if enabled
672            if position_mode.is_some()
673                && let Some(positions) = local_positions.get(&term_spur)
674            {
675                let pos_posting = self
676                    .position_index
677                    .entry(term_key)
678                    .or_insert_with(PositionPostingListBuilder::new);
679                for &pos in positions {
680                    pos_posting.add_position(doc_id, pos);
681                }
682            }
683        }
684
685        Ok(token_position)
686    }
687
688    fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
689        // For numeric fields, we use a special encoding
690        let term_str = format!("__num_{}", value);
691        let term_spur = self.term_interner.get_or_intern(&term_str);
692
693        let term_key = TermKey {
694            field: field.0,
695            term: term_spur,
696        };
697
698        let posting = self
699            .inverted_index
700            .entry(term_key)
701            .or_insert_with(PostingListBuilder::new);
702        posting.add(doc_id, 1);
703
704        Ok(())
705    }
706
707    /// Index a dense vector field
708    fn index_dense_vector_field(
709        &mut self,
710        field: Field,
711        doc_id: DocId,
712        vector: &[f32],
713    ) -> Result<()> {
714        let dim = vector.len();
715
716        let builder = self
717            .dense_vectors
718            .entry(field.0)
719            .or_insert_with(|| DenseVectorBuilder::new(dim));
720
721        // Verify dimension consistency
722        if builder.dim != dim && builder.len() > 0 {
723            return Err(crate::Error::Schema(format!(
724                "Dense vector dimension mismatch: expected {}, got {}",
725                builder.dim, dim
726            )));
727        }
728
729        builder.add(doc_id, vector);
730        Ok(())
731    }
732
733    /// Index a sparse vector field using dedicated sparse posting lists
734    ///
735    /// Collects (doc_id, weight) postings per dimension. During commit, these are
736    /// converted to BlockSparsePostingList with proper quantization from SparseVectorConfig.
737    ///
738    /// Weights below the configured `weight_threshold` are not indexed.
739    fn index_sparse_vector_field(
740        &mut self,
741        field: Field,
742        doc_id: DocId,
743        entries: &[(u32, f32)],
744    ) -> Result<()> {
745        // Get weight threshold from field config (default 0.0 = no filtering)
746        let weight_threshold = self
747            .schema
748            .get_field_entry(field)
749            .and_then(|entry| entry.sparse_vector_config.as_ref())
750            .map(|config| config.weight_threshold)
751            .unwrap_or(0.0);
752
753        let builder = self
754            .sparse_vectors
755            .entry(field.0)
756            .or_insert_with(SparseVectorBuilder::new);
757
758        for &(dim_id, weight) in entries {
759            // Skip weights below threshold
760            if weight.abs() < weight_threshold {
761                continue;
762            }
763
764            builder.add(dim_id, doc_id, weight);
765        }
766
767        Ok(())
768    }
769
770    /// Write document to streaming store
771    fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
772        use byteorder::{LittleEndian, WriteBytesExt};
773
774        let doc_bytes = super::store::serialize_document(doc, &self.schema)?;
775
776        self.store_file
777            .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
778        self.store_file.write_all(&doc_bytes)?;
779
780        Ok(())
781    }
782
783    /// Build the final segment
784    pub async fn build<D: Directory + DirectoryWriter>(
785        mut self,
786        dir: &D,
787        segment_id: SegmentId,
788    ) -> Result<SegmentMeta> {
789        // Flush any buffered data
790        self.store_file.flush()?;
791
792        let files = SegmentFiles::new(segment_id.0);
793
794        // Build positions FIRST to get offsets for TermInfo
795        let (positions_data, position_offsets) = self.build_positions_file()?;
796
797        // Extract data needed for parallel processing
798        let store_path = self.store_path.clone();
799        let schema = self.schema.clone();
800        let num_compression_threads = self.config.num_compression_threads;
801        let compression_level = self.config.compression_level;
802
803        // Build postings and document store in parallel
804        let (postings_result, store_result) = rayon::join(
805            || self.build_postings(&position_offsets),
806            || {
807                Self::build_store_parallel(
808                    &store_path,
809                    &schema,
810                    num_compression_threads,
811                    compression_level,
812                )
813            },
814        );
815
816        let (term_dict_data, postings_data) = postings_result?;
817        let store_data = store_result?;
818
819        // Write to directory
820        dir.write(&files.term_dict, &term_dict_data).await?;
821        dir.write(&files.postings, &postings_data).await?;
822        dir.write(&files.store, &store_data).await?;
823
824        // Write positions file (data only, offsets are in TermInfo)
825        if !positions_data.is_empty() {
826            dir.write(&files.positions, &positions_data).await?;
827        }
828
829        // Build and write dense vector indexes (RaBitQ) - all in one file
830        if !self.dense_vectors.is_empty() {
831            let vectors_data = self.build_vectors_file()?;
832            if !vectors_data.is_empty() {
833                dir.write(&files.vectors, &vectors_data).await?;
834            }
835        }
836
837        // Build and write sparse vector posting lists
838        if !self.sparse_vectors.is_empty() {
839            let sparse_data = self.build_sparse_file()?;
840            if !sparse_data.is_empty() {
841                dir.write(&files.sparse, &sparse_data).await?;
842            }
843        }
844
845        let meta = SegmentMeta {
846            id: segment_id.0,
847            num_docs: self.next_doc_id,
848            field_stats: self.field_stats.clone(),
849        };
850
851        dir.write(&files.meta, &meta.serialize()?).await?;
852
853        // Cleanup temp files
854        let _ = std::fs::remove_file(&self.store_path);
855
856        Ok(meta)
857    }
858
859    /// Build unified vectors file containing all dense vector indexes
860    ///
861    /// File format:
862    /// - Header: num_fields (u32)
863    /// - For each field: field_id (u32), index_type (u8), offset (u64), length (u64)
864    /// - Data: concatenated serialized indexes (RaBitQ, IVF-RaBitQ, or ScaNN)
865    fn build_vectors_file(&self) -> Result<Vec<u8>> {
866        use byteorder::{LittleEndian, WriteBytesExt};
867
868        // Build all indexes first: (field_id, index_type, data)
869        let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
870
871        for (&field_id, builder) in &self.dense_vectors {
872            if builder.len() == 0 {
873                continue;
874            }
875
876            let field = crate::dsl::Field(field_id);
877
878            // Get dense vector config
879            let dense_config = self
880                .schema
881                .get_field_entry(field)
882                .and_then(|e| e.dense_vector_config.as_ref());
883
884            // Get vectors, potentially trimmed for matryoshka/MRL indexing
885            let index_dim = dense_config.map(|c| c.index_dim()).unwrap_or(builder.dim);
886            let vectors = if index_dim < builder.dim {
887                // Trim vectors to mrl_dim for indexing
888                builder.get_vectors_trimmed(index_dim)
889            } else {
890                builder.get_vectors()
891            };
892
893            // During normal indexing, segments always store Flat (raw vectors).
894            // ANN indexes are built at index-level via build_vector_index() which
895            // trains centroids/codebooks once from all vectors and triggers rebuild.
896            let flat_data = FlatVectorData {
897                dim: index_dim,
898                vectors: vectors.clone(),
899                doc_ids: builder.doc_ids.clone(),
900            };
901            let index_bytes = serde_json::to_vec(&flat_data)
902                .map_err(|e| crate::Error::Serialization(e.to_string()))?;
903            let index_type = 3u8; // 3 = Flat
904
905            field_indexes.push((field_id, index_type, index_bytes));
906        }
907
908        if field_indexes.is_empty() {
909            return Ok(Vec::new());
910        }
911
912        // Sort by field_id for consistent ordering
913        field_indexes.sort_by_key(|(id, _, _)| *id);
914
915        // Calculate header size: num_fields + (field_id, index_type, offset, len) per field
916        let header_size = 4 + field_indexes.len() * (4 + 1 + 8 + 8);
917
918        // Build output
919        let mut output = Vec::new();
920
921        // Write number of fields
922        output.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
923
924        // Calculate offsets and write header entries
925        let mut current_offset = header_size as u64;
926        for (field_id, index_type, data) in &field_indexes {
927            output.write_u32::<LittleEndian>(*field_id)?;
928            output.write_u8(*index_type)?;
929            output.write_u64::<LittleEndian>(current_offset)?;
930            output.write_u64::<LittleEndian>(data.len() as u64)?;
931            current_offset += data.len() as u64;
932        }
933
934        // Write data
935        for (_, _, data) in field_indexes {
936            output.extend_from_slice(&data);
937        }
938
939        Ok(output)
940    }
941
942    /// Build sparse vectors file containing BlockSparsePostingList per field/dimension
943    ///
944    /// File format (direct-indexed table for O(1) dimension lookup):
945    /// - Header: num_fields (u32)
946    /// - For each field:
947    ///   - field_id (u32)
948    ///   - quantization (u8)
949    ///   - max_dim_id (u32)          ← highest dimension ID + 1 (table size)
950    ///   - table: [(offset: u64, length: u32)] × max_dim_id  ← direct indexed by dim_id
951    ///     (offset=0, length=0 means dimension not present)
952    /// - Data: concatenated serialized BlockSparsePostingList
953    fn build_sparse_file(&self) -> Result<Vec<u8>> {
954        use crate::structures::{BlockSparsePostingList, WeightQuantization};
955        use byteorder::{LittleEndian, WriteBytesExt};
956
957        if self.sparse_vectors.is_empty() {
958            return Ok(Vec::new());
959        }
960
961        // Collect field data: (field_id, quantization, max_dim_id, dim_id -> serialized_bytes)
962        type SparseFieldData = (u32, WeightQuantization, u32, FxHashMap<u32, Vec<u8>>);
963        let mut field_data: Vec<SparseFieldData> = Vec::new();
964
965        for (&field_id, builder) in &self.sparse_vectors {
966            if builder.is_empty() {
967                continue;
968            }
969
970            let field = crate::dsl::Field(field_id);
971
972            // Get quantization from field config
973            let quantization = self
974                .schema
975                .get_field_entry(field)
976                .and_then(|e| e.sparse_vector_config.as_ref())
977                .map(|c| c.weight_quantization)
978                .unwrap_or(WeightQuantization::Float32);
979
980            // Find max dimension ID
981            let max_dim_id = builder.postings.keys().max().copied().unwrap_or(0);
982
983            // Build BlockSparsePostingList for each dimension
984            let mut dim_bytes: FxHashMap<u32, Vec<u8>> = FxHashMap::default();
985
986            for (&dim_id, postings) in &builder.postings {
987                // Sort postings by doc_id
988                let mut sorted_postings = postings.clone();
989                sorted_postings.sort_by_key(|(doc_id, _)| *doc_id);
990
991                // Build BlockSparsePostingList
992                let block_list =
993                    BlockSparsePostingList::from_postings(&sorted_postings, quantization)
994                        .map_err(crate::Error::Io)?;
995
996                // Serialize
997                let mut bytes = Vec::new();
998                block_list.serialize(&mut bytes).map_err(crate::Error::Io)?;
999
1000                dim_bytes.insert(dim_id, bytes);
1001            }
1002
1003            field_data.push((field_id, quantization, max_dim_id + 1, dim_bytes));
1004        }
1005
1006        if field_data.is_empty() {
1007            return Ok(Vec::new());
1008        }
1009
1010        // Sort by field_id
1011        field_data.sort_by_key(|(id, _, _, _)| *id);
1012
1013        // Calculate header size
1014        // Header: num_fields (4)
1015        // Per field: field_id (4) + quant (1) + max_dim_id (4) + table (12 * max_dim_id)
1016        let mut header_size = 4u64;
1017        for (_, _, max_dim_id, _) in &field_data {
1018            header_size += 4 + 1 + 4; // field_id + quant + max_dim_id
1019            header_size += (*max_dim_id as u64) * 12; // table entries: (offset: u64, length: u32)
1020        }
1021
1022        // Build output
1023        let mut output = Vec::new();
1024
1025        // Write num_fields
1026        output.write_u32::<LittleEndian>(field_data.len() as u32)?;
1027
1028        // Track current data offset (after all headers)
1029        let mut current_offset = header_size;
1030
1031        // First, collect all data bytes in order and build offset tables
1032        let mut all_data: Vec<u8> = Vec::new();
1033        let mut field_tables: Vec<Vec<(u64, u32)>> = Vec::new();
1034
1035        for (_, _, max_dim_id, dim_bytes) in &field_data {
1036            let mut table: Vec<(u64, u32)> = vec![(0, 0); *max_dim_id as usize];
1037
1038            // Process dimensions in order
1039            for dim_id in 0..*max_dim_id {
1040                if let Some(bytes) = dim_bytes.get(&dim_id) {
1041                    table[dim_id as usize] = (current_offset, bytes.len() as u32);
1042                    current_offset += bytes.len() as u64;
1043                    all_data.extend_from_slice(bytes);
1044                }
1045                // else: table entry stays (0, 0) meaning dimension not present
1046            }
1047
1048            field_tables.push(table);
1049        }
1050
1051        // Write field headers and tables
1052        for (i, (field_id, quantization, max_dim_id, _)) in field_data.iter().enumerate() {
1053            output.write_u32::<LittleEndian>(*field_id)?;
1054            output.write_u8(*quantization as u8)?;
1055            output.write_u32::<LittleEndian>(*max_dim_id)?;
1056
1057            // Write table (direct indexed by dim_id)
1058            for &(offset, length) in &field_tables[i] {
1059                output.write_u64::<LittleEndian>(offset)?;
1060                output.write_u32::<LittleEndian>(length)?;
1061            }
1062        }
1063
1064        // Write data
1065        output.extend_from_slice(&all_data);
1066
1067        Ok(output)
1068    }
1069
1070    /// Build positions file for phrase queries
1071    ///
1072    /// File format:
1073    /// - Data only: concatenated serialized PositionPostingList
1074    /// - Position offsets are stored in TermInfo (no separate header needed)
1075    ///
1076    /// Returns: (positions_data, term_key -> (offset, len) mapping)
1077    #[allow(clippy::type_complexity)]
1078    fn build_positions_file(&self) -> Result<(Vec<u8>, FxHashMap<Vec<u8>, (u64, u32)>)> {
1079        use crate::structures::PositionPostingList;
1080
1081        let mut position_offsets: FxHashMap<Vec<u8>, (u64, u32)> = FxHashMap::default();
1082
1083        if self.position_index.is_empty() {
1084            return Ok((Vec::new(), position_offsets));
1085        }
1086
1087        // Collect and sort entries by key
1088        let mut entries: Vec<(Vec<u8>, &PositionPostingListBuilder)> = self
1089            .position_index
1090            .iter()
1091            .map(|(term_key, pos_list)| {
1092                let term_str = self.term_interner.resolve(&term_key.term);
1093                let mut key = Vec::with_capacity(4 + term_str.len());
1094                key.extend_from_slice(&term_key.field.to_le_bytes());
1095                key.extend_from_slice(term_str.as_bytes());
1096                (key, pos_list)
1097            })
1098            .collect();
1099
1100        entries.sort_by(|a, b| a.0.cmp(&b.0));
1101
1102        // Serialize all position lists and track offsets
1103        let mut output = Vec::new();
1104
1105        for (key, pos_builder) in entries {
1106            // Convert builder to PositionPostingList
1107            let mut pos_list = PositionPostingList::with_capacity(pos_builder.postings.len());
1108            for (doc_id, positions) in &pos_builder.postings {
1109                pos_list.push(*doc_id, positions.clone());
1110            }
1111
1112            // Serialize and track offset
1113            let offset = output.len() as u64;
1114            pos_list.serialize(&mut output).map_err(crate::Error::Io)?;
1115            let len = (output.len() as u64 - offset) as u32;
1116
1117            position_offsets.insert(key, (offset, len));
1118        }
1119
1120        Ok((output, position_offsets))
1121    }
1122
1123    /// Build postings from inverted index
1124    ///
1125    /// Uses parallel processing to serialize posting lists concurrently.
1126    /// Position offsets are looked up and embedded in TermInfo.
1127    fn build_postings(
1128        &mut self,
1129        position_offsets: &FxHashMap<Vec<u8>, (u64, u32)>,
1130    ) -> Result<(Vec<u8>, Vec<u8>)> {
1131        // Phase 1: Collect and sort term keys (parallel key generation)
1132        // Key format: field_id (4 bytes) + term bytes
1133        let mut term_entries: Vec<(Vec<u8>, &PostingListBuilder)> = self
1134            .inverted_index
1135            .iter()
1136            .map(|(term_key, posting_list)| {
1137                let term_str = self.term_interner.resolve(&term_key.term);
1138                let mut key = Vec::with_capacity(4 + term_str.len());
1139                key.extend_from_slice(&term_key.field.to_le_bytes());
1140                key.extend_from_slice(term_str.as_bytes());
1141                (key, posting_list)
1142            })
1143            .collect();
1144
1145        // Sort by key for SSTable ordering
1146        term_entries.par_sort_unstable_by(|a, b| a.0.cmp(&b.0));
1147
1148        // Phase 2: Parallel serialization of posting lists
1149        // Each term's posting list is serialized independently
1150        let serialized: Vec<(Vec<u8>, SerializedPosting)> = term_entries
1151            .into_par_iter()
1152            .map(|(key, posting_builder)| {
1153                // Build posting list from in-memory postings
1154                let mut full_postings = PostingList::with_capacity(posting_builder.len());
1155                for p in &posting_builder.postings {
1156                    full_postings.push(p.doc_id, p.term_freq as u32);
1157                }
1158
1159                // Build term info
1160                let doc_ids: Vec<u32> = full_postings.iter().map(|p| p.doc_id).collect();
1161                let term_freqs: Vec<u32> = full_postings.iter().map(|p| p.term_freq).collect();
1162
1163                // Don't inline if term has positions (inline format doesn't support position offsets)
1164                let has_positions = position_offsets.contains_key(&key);
1165                let result = if !has_positions
1166                    && let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs)
1167                {
1168                    SerializedPosting::Inline(inline)
1169                } else {
1170                    // Serialize to local buffer
1171                    let mut posting_bytes = Vec::new();
1172                    let block_list =
1173                        crate::structures::BlockPostingList::from_posting_list(&full_postings)
1174                            .expect("BlockPostingList creation failed");
1175                    block_list
1176                        .serialize(&mut posting_bytes)
1177                        .expect("BlockPostingList serialization failed");
1178                    SerializedPosting::External {
1179                        bytes: posting_bytes,
1180                        doc_count: full_postings.doc_count(),
1181                    }
1182                };
1183
1184                (key, result)
1185            })
1186            .collect();
1187
1188        // Phase 3: Sequential assembly (must be sequential for offset calculation)
1189        let mut term_dict = Vec::new();
1190        let mut postings = Vec::new();
1191        let mut writer = SSTableWriter::<TermInfo>::new(&mut term_dict);
1192
1193        for (key, serialized_posting) in serialized {
1194            let term_info = match serialized_posting {
1195                SerializedPosting::Inline(info) => info,
1196                SerializedPosting::External { bytes, doc_count } => {
1197                    let posting_offset = postings.len() as u64;
1198                    let posting_len = bytes.len() as u32;
1199                    postings.extend_from_slice(&bytes);
1200
1201                    // Look up position offset for this term
1202                    if let Some(&(pos_offset, pos_len)) = position_offsets.get(&key) {
1203                        TermInfo::external_with_positions(
1204                            posting_offset,
1205                            posting_len,
1206                            doc_count,
1207                            pos_offset,
1208                            pos_len,
1209                        )
1210                    } else {
1211                        TermInfo::external(posting_offset, posting_len, doc_count)
1212                    }
1213                }
1214            };
1215
1216            writer.insert(&key, &term_info)?;
1217        }
1218
1219        writer.finish()?;
1220        Ok((term_dict, postings))
1221    }
1222
1223    /// Build document store from streamed temp file (static method for parallel execution)
1224    ///
1225    /// Uses parallel processing to deserialize documents concurrently.
1226    fn build_store_parallel(
1227        store_path: &PathBuf,
1228        schema: &Schema,
1229        num_compression_threads: usize,
1230        compression_level: CompressionLevel,
1231    ) -> Result<Vec<u8>> {
1232        use super::store::EagerParallelStoreWriter;
1233
1234        let file = File::open(store_path)?;
1235        let mmap = unsafe { memmap2::Mmap::map(&file)? };
1236
1237        // Phase 1: Parse document boundaries (sequential, fast)
1238        let mut doc_ranges: Vec<(usize, usize)> = Vec::new();
1239        let mut offset = 0usize;
1240        while offset + 4 <= mmap.len() {
1241            let doc_len = u32::from_le_bytes([
1242                mmap[offset],
1243                mmap[offset + 1],
1244                mmap[offset + 2],
1245                mmap[offset + 3],
1246            ]) as usize;
1247            offset += 4;
1248
1249            if offset + doc_len > mmap.len() {
1250                break;
1251            }
1252
1253            doc_ranges.push((offset, doc_len));
1254            offset += doc_len;
1255        }
1256
1257        // Phase 2: Parallel deserialization of documents
1258        let docs: Vec<Document> = doc_ranges
1259            .into_par_iter()
1260            .filter_map(|(start, len)| {
1261                let doc_bytes = &mmap[start..start + len];
1262                super::store::deserialize_document(doc_bytes, schema).ok()
1263            })
1264            .collect();
1265
1266        // Phase 3: Write to store (compression is already parallel in EagerParallelStoreWriter)
1267        let mut store_data = Vec::new();
1268        let mut store_writer = EagerParallelStoreWriter::with_compression_level(
1269            &mut store_data,
1270            num_compression_threads,
1271            compression_level,
1272        );
1273
1274        for doc in &docs {
1275            store_writer.store(doc, schema)?;
1276        }
1277
1278        store_writer.finish()?;
1279        Ok(store_data)
1280    }
1281}
1282
1283impl Drop for SegmentBuilder {
1284    fn drop(&mut self) {
1285        // Cleanup temp files on drop
1286        let _ = std::fs::remove_file(&self.store_path);
1287    }
1288}