Skip to main content

hermes_core/segment/
builder.rs

1//! Streaming segment builder with optimized memory usage
2//!
3//! Key optimizations:
4//! - **String interning**: Terms are interned using `lasso` to avoid repeated allocations
5//! - **hashbrown HashMap**: O(1) average insertion instead of BTreeMap's O(log n)
6//! - **Streaming document store**: Documents written to disk immediately
7//! - **Incremental posting flush**: Large posting lists flushed to temp file
8//! - **Memory-mapped intermediate files**: Reduces memory pressure
9//! - **Arena allocation**: Batch allocations for reduced fragmentation
10
11use std::fs::{File, OpenOptions};
12use std::io::{BufWriter, Write};
13use std::path::PathBuf;
14
15use hashbrown::HashMap;
16use lasso::{Rodeo, Spur};
17use rayon::prelude::*;
18use rustc_hash::FxHashMap;
19
20use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
21use crate::compression::CompressionLevel;
22use crate::directories::{Directory, DirectoryWriter};
23use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
24use crate::structures::{PostingList, SSTableWriter, TermInfo};
25use crate::tokenizer::BoxedTokenizer;
26use crate::{DocId, Result};
27
28/// Size of the document store buffer before writing to disk
29const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; // 16MB
30
31/// Interned term key combining field and term
32#[derive(Clone, Copy, PartialEq, Eq, Hash)]
33struct TermKey {
34    field: u32,
35    term: Spur,
36}
37
38/// Compact posting entry for in-memory storage
39#[derive(Clone, Copy)]
40struct CompactPosting {
41    doc_id: DocId,
42    term_freq: u16,
43}
44
45/// In-memory posting list for a term
46struct PostingListBuilder {
47    /// In-memory postings
48    postings: Vec<CompactPosting>,
49}
50
51impl PostingListBuilder {
52    fn new() -> Self {
53        Self {
54            postings: Vec::new(),
55        }
56    }
57
58    /// Add a posting, merging if same doc_id as last
59    #[inline]
60    fn add(&mut self, doc_id: DocId, term_freq: u32) {
61        // Check if we can merge with the last posting
62        if let Some(last) = self.postings.last_mut()
63            && last.doc_id == doc_id
64        {
65            last.term_freq = last.term_freq.saturating_add(term_freq as u16);
66            return;
67        }
68        self.postings.push(CompactPosting {
69            doc_id,
70            term_freq: term_freq.min(u16::MAX as u32) as u16,
71        });
72    }
73
74    fn len(&self) -> usize {
75        self.postings.len()
76    }
77}
78
79/// In-memory position posting list for a term (for fields with record_positions=true)
80struct PositionPostingListBuilder {
81    /// Doc ID -> list of positions (encoded as element_ordinal << 20 | token_position)
82    postings: Vec<(DocId, Vec<u32>)>,
83}
84
85impl PositionPostingListBuilder {
86    fn new() -> Self {
87        Self {
88            postings: Vec::new(),
89        }
90    }
91
92    /// Add a position for a document
93    #[inline]
94    fn add_position(&mut self, doc_id: DocId, position: u32) {
95        if let Some((last_doc, positions)) = self.postings.last_mut()
96            && *last_doc == doc_id
97        {
98            positions.push(position);
99            return;
100        }
101        self.postings.push((doc_id, vec![position]));
102    }
103}
104
105/// Intermediate result for parallel posting serialization
106enum SerializedPosting {
107    /// Inline posting (small enough to fit in TermInfo)
108    Inline(TermInfo),
109    /// External posting with serialized bytes
110    External { bytes: Vec<u8>, doc_count: u32 },
111}
112
113/// Statistics for debugging segment builder performance
114#[derive(Debug, Clone)]
115pub struct SegmentBuilderStats {
116    /// Number of documents indexed
117    pub num_docs: u32,
118    /// Number of unique terms in the inverted index
119    pub unique_terms: usize,
120    /// Total postings in memory (across all terms)
121    pub postings_in_memory: usize,
122    /// Number of interned strings
123    pub interned_strings: usize,
124    /// Size of doc_field_lengths vector
125    pub doc_field_lengths_size: usize,
126    /// Estimated total memory usage in bytes
127    pub estimated_memory_bytes: usize,
128    /// Memory breakdown by component
129    pub memory_breakdown: MemoryBreakdown,
130}
131
132/// Detailed memory breakdown by component
133#[derive(Debug, Clone, Default)]
134pub struct MemoryBreakdown {
135    /// Postings memory (CompactPosting structs)
136    pub postings_bytes: usize,
137    /// Inverted index HashMap overhead
138    pub index_overhead_bytes: usize,
139    /// Term interner memory
140    pub interner_bytes: usize,
141    /// Document field lengths
142    pub field_lengths_bytes: usize,
143    /// Dense vector storage
144    pub dense_vectors_bytes: usize,
145    /// Number of dense vectors
146    pub dense_vector_count: usize,
147}
148
149/// Configuration for segment builder
150#[derive(Clone)]
151pub struct SegmentBuilderConfig {
152    /// Directory for temporary spill files
153    pub temp_dir: PathBuf,
154    /// Compression level for document store
155    pub compression_level: CompressionLevel,
156    /// Number of threads for parallel compression
157    pub num_compression_threads: usize,
158    /// Initial capacity for term interner
159    pub interner_capacity: usize,
160    /// Initial capacity for posting lists hashmap
161    pub posting_map_capacity: usize,
162}
163
164impl Default for SegmentBuilderConfig {
165    fn default() -> Self {
166        Self {
167            temp_dir: std::env::temp_dir(),
168            compression_level: CompressionLevel(7),
169            num_compression_threads: num_cpus::get(),
170            interner_capacity: 1_000_000,
171            posting_map_capacity: 500_000,
172        }
173    }
174}
175
176/// Segment builder with optimized memory usage
177///
178/// Features:
179/// - Streams documents to disk immediately (no in-memory document storage)
180/// - Uses string interning for terms (reduced allocations)
181/// - Uses hashbrown HashMap (faster than BTreeMap)
182pub struct SegmentBuilder {
183    schema: Schema,
184    config: SegmentBuilderConfig,
185    tokenizers: FxHashMap<Field, BoxedTokenizer>,
186
187    /// String interner for terms - O(1) lookup and deduplication
188    term_interner: Rodeo,
189
190    /// Inverted index: term key -> posting list
191    inverted_index: HashMap<TermKey, PostingListBuilder>,
192
193    /// Streaming document store writer
194    store_file: BufWriter<File>,
195    store_path: PathBuf,
196
197    /// Document count
198    next_doc_id: DocId,
199
200    /// Per-field statistics for BM25F
201    field_stats: FxHashMap<u32, FieldStats>,
202
203    /// Per-document field lengths stored compactly
204    /// Uses a flat Vec instead of Vec<HashMap> for better cache locality
205    /// Layout: [doc0_field0_len, doc0_field1_len, ..., doc1_field0_len, ...]
206    doc_field_lengths: Vec<u32>,
207    num_indexed_fields: usize,
208    field_to_slot: FxHashMap<u32, usize>,
209
210    /// Reusable buffer for per-document term frequency aggregation
211    /// Avoids allocating a new hashmap for each document
212    local_tf_buffer: FxHashMap<Spur, u32>,
213
214    /// Reusable buffer for tokenization to avoid per-token String allocations
215    token_buffer: String,
216
217    /// Dense vector storage per field: field -> (doc_ids, vectors)
218    /// Vectors are stored as flat f32 arrays for efficient RaBitQ indexing
219    dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
220
221    /// Sparse vector storage per field: field -> SparseVectorBuilder
222    /// Uses proper BlockSparsePostingList with configurable quantization
223    sparse_vectors: FxHashMap<u32, SparseVectorBuilder>,
224
225    /// Position index for fields with positions enabled
226    /// term key -> position posting list
227    position_index: HashMap<TermKey, PositionPostingListBuilder>,
228
229    /// Fields that have position tracking enabled, with their mode
230    position_enabled_fields: FxHashMap<u32, Option<crate::dsl::PositionMode>>,
231
232    /// Current element ordinal for multi-valued fields (reset per document)
233    current_element_ordinal: FxHashMap<u32, u32>,
234}
235
236/// Builder for dense vector index using RaBitQ
237struct DenseVectorBuilder {
238    /// Dimension of vectors
239    dim: usize,
240    /// Document IDs with vectors
241    doc_ids: Vec<DocId>,
242    /// Flat vector storage (doc_ids.len() * dim floats)
243    vectors: Vec<f32>,
244}
245
246impl DenseVectorBuilder {
247    fn new(dim: usize) -> Self {
248        Self {
249            dim,
250            doc_ids: Vec::new(),
251            vectors: Vec::new(),
252        }
253    }
254
255    fn add(&mut self, doc_id: DocId, vector: &[f32]) {
256        debug_assert_eq!(vector.len(), self.dim, "Vector dimension mismatch");
257        self.doc_ids.push(doc_id);
258        self.vectors.extend_from_slice(vector);
259    }
260
261    fn len(&self) -> usize {
262        self.doc_ids.len()
263    }
264
265    /// Get all vectors as Vec<Vec<f32>> for RaBitQ indexing
266    fn get_vectors(&self) -> Vec<Vec<f32>> {
267        self.doc_ids
268            .iter()
269            .enumerate()
270            .map(|(i, _)| {
271                let start = i * self.dim;
272                self.vectors[start..start + self.dim].to_vec()
273            })
274            .collect()
275    }
276
277    /// Get vectors trimmed to specified dimension for matryoshka/MRL indexing
278    fn get_vectors_trimmed(&self, trim_dim: usize) -> Vec<Vec<f32>> {
279        debug_assert!(trim_dim <= self.dim, "trim_dim must be <= dim");
280        self.doc_ids
281            .iter()
282            .enumerate()
283            .map(|(i, _)| {
284                let start = i * self.dim;
285                self.vectors[start..start + trim_dim].to_vec()
286            })
287            .collect()
288    }
289}
290
291/// Builder for sparse vector index using BlockSparsePostingList
292///
293/// Collects (doc_id, weight) postings per dimension, then builds
294/// BlockSparsePostingList with proper quantization during commit.
295struct SparseVectorBuilder {
296    /// Postings per dimension: dim_id -> Vec<(doc_id, weight)>
297    postings: FxHashMap<u32, Vec<(DocId, f32)>>,
298}
299
300impl SparseVectorBuilder {
301    fn new() -> Self {
302        Self {
303            postings: FxHashMap::default(),
304        }
305    }
306
307    /// Add a sparse vector entry
308    #[inline]
309    fn add(&mut self, dim_id: u32, doc_id: DocId, weight: f32) {
310        self.postings
311            .entry(dim_id)
312            .or_default()
313            .push((doc_id, weight));
314    }
315
316    fn is_empty(&self) -> bool {
317        self.postings.is_empty()
318    }
319}
320
321impl SegmentBuilder {
322    /// Create a new segment builder
323    pub fn new(schema: Schema, config: SegmentBuilderConfig) -> Result<Self> {
324        let segment_id = uuid::Uuid::new_v4();
325        let store_path = config
326            .temp_dir
327            .join(format!("hermes_store_{}.tmp", segment_id));
328
329        let store_file = BufWriter::with_capacity(
330            STORE_BUFFER_SIZE,
331            OpenOptions::new()
332                .create(true)
333                .write(true)
334                .truncate(true)
335                .open(&store_path)?,
336        );
337
338        // Count indexed fields for compact field length storage
339        // Also track which fields have position recording enabled
340        let mut num_indexed_fields = 0;
341        let mut field_to_slot = FxHashMap::default();
342        let mut position_enabled_fields = FxHashMap::default();
343        for (field, entry) in schema.fields() {
344            if entry.indexed && matches!(entry.field_type, FieldType::Text) {
345                field_to_slot.insert(field.0, num_indexed_fields);
346                num_indexed_fields += 1;
347                if entry.positions.is_some() {
348                    position_enabled_fields.insert(field.0, entry.positions);
349                }
350            }
351        }
352
353        Ok(Self {
354            schema,
355            tokenizers: FxHashMap::default(),
356            term_interner: Rodeo::new(),
357            inverted_index: HashMap::with_capacity(config.posting_map_capacity),
358            store_file,
359            store_path,
360            next_doc_id: 0,
361            field_stats: FxHashMap::default(),
362            doc_field_lengths: Vec::new(),
363            num_indexed_fields,
364            field_to_slot,
365            local_tf_buffer: FxHashMap::default(),
366            token_buffer: String::with_capacity(64),
367            config,
368            dense_vectors: FxHashMap::default(),
369            sparse_vectors: FxHashMap::default(),
370            position_index: HashMap::new(),
371            position_enabled_fields,
372            current_element_ordinal: FxHashMap::default(),
373        })
374    }
375
376    pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
377        self.tokenizers.insert(field, tokenizer);
378    }
379
380    pub fn num_docs(&self) -> u32 {
381        self.next_doc_id
382    }
383
384    /// Get current statistics for debugging performance
385    pub fn stats(&self) -> SegmentBuilderStats {
386        use std::mem::size_of;
387
388        let postings_in_memory: usize =
389            self.inverted_index.values().map(|p| p.postings.len()).sum();
390
391        // Precise memory calculation using actual struct sizes
392        // CompactPosting: doc_id (u32) + term_freq (u16) = 6 bytes, but may have padding
393        let compact_posting_size = size_of::<CompactPosting>();
394
395        // Postings: actual Vec capacity * element size + Vec overhead (24 bytes on 64-bit)
396        let postings_bytes: usize = self
397            .inverted_index
398            .values()
399            .map(|p| {
400                p.postings.capacity() * compact_posting_size + size_of::<Vec<CompactPosting>>()
401            })
402            .sum();
403
404        // Inverted index: HashMap overhead per entry
405        // Each entry: TermKey (field u32 + Spur 4 bytes = 8 bytes) + PostingListBuilder + HashMap bucket overhead
406        // HashMap typically uses ~1.5x capacity, each bucket ~16-24 bytes
407        let term_key_size = size_of::<TermKey>();
408        let posting_builder_size = size_of::<PostingListBuilder>();
409        let hashmap_entry_overhead = 24; // bucket pointer + metadata
410        let index_overhead_bytes = self.inverted_index.len()
411            * (term_key_size + posting_builder_size + hashmap_entry_overhead);
412
413        // Term interner: Rodeo stores strings + metadata
414        // Each interned string: actual string bytes + Spur (4 bytes) + internal overhead (~16 bytes)
415        // We can't get exact string lengths, so estimate average term length of 8 bytes
416        let avg_term_len = 8;
417        let interner_overhead_per_string = size_of::<lasso::Spur>() + 16;
418        let interner_bytes =
419            self.term_interner.len() * (avg_term_len + interner_overhead_per_string);
420
421        // Doc field lengths: Vec<u32> with capacity
422        let field_lengths_bytes =
423            self.doc_field_lengths.capacity() * size_of::<u32>() + size_of::<Vec<u32>>();
424
425        // Dense vectors: actual capacity used
426        let mut dense_vectors_bytes: usize = 0;
427        let mut dense_vector_count: usize = 0;
428        for b in self.dense_vectors.values() {
429            // vectors: Vec<f32> capacity + doc_ids: Vec<DocId> capacity
430            dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
431                + b.doc_ids.capacity() * size_of::<DocId>()
432                + size_of::<Vec<f32>>()
433                + size_of::<Vec<DocId>>();
434            dense_vector_count += b.doc_ids.len();
435        }
436
437        // Local buffers
438        let local_tf_buffer_bytes =
439            self.local_tf_buffer.capacity() * (size_of::<lasso::Spur>() + size_of::<u32>() + 16);
440
441        let estimated_memory_bytes = postings_bytes
442            + index_overhead_bytes
443            + interner_bytes
444            + field_lengths_bytes
445            + dense_vectors_bytes
446            + local_tf_buffer_bytes;
447
448        let memory_breakdown = MemoryBreakdown {
449            postings_bytes,
450            index_overhead_bytes,
451            interner_bytes,
452            field_lengths_bytes,
453            dense_vectors_bytes,
454            dense_vector_count,
455        };
456
457        SegmentBuilderStats {
458            num_docs: self.next_doc_id,
459            unique_terms: self.inverted_index.len(),
460            postings_in_memory,
461            interned_strings: self.term_interner.len(),
462            doc_field_lengths_size: self.doc_field_lengths.len(),
463            estimated_memory_bytes,
464            memory_breakdown,
465        }
466    }
467
468    /// Add a document - streams to disk immediately
469    pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
470        let doc_id = self.next_doc_id;
471        self.next_doc_id += 1;
472
473        // Initialize field lengths for this document
474        let base_idx = self.doc_field_lengths.len();
475        self.doc_field_lengths
476            .resize(base_idx + self.num_indexed_fields, 0);
477
478        // Reset element ordinals for this document (for multi-valued fields)
479        self.current_element_ordinal.clear();
480
481        for (field, value) in doc.field_values() {
482            let entry = self.schema.get_field_entry(*field);
483            if entry.is_none() || !entry.unwrap().indexed {
484                continue;
485            }
486
487            let entry = entry.unwrap();
488            match (&entry.field_type, value) {
489                (FieldType::Text, FieldValue::Text(text)) => {
490                    // Get current element ordinal for multi-valued fields
491                    let element_ordinal = *self.current_element_ordinal.get(&field.0).unwrap_or(&0);
492                    let token_count =
493                        self.index_text_field(*field, doc_id, text, element_ordinal)?;
494                    // Increment element ordinal for next value of this field
495                    *self.current_element_ordinal.entry(field.0).or_insert(0) += 1;
496
497                    // Update field statistics
498                    let stats = self.field_stats.entry(field.0).or_default();
499                    stats.total_tokens += token_count as u64;
500                    stats.doc_count += 1;
501
502                    // Store field length compactly
503                    if let Some(&slot) = self.field_to_slot.get(&field.0) {
504                        self.doc_field_lengths[base_idx + slot] = token_count;
505                    }
506                }
507                (FieldType::U64, FieldValue::U64(v)) => {
508                    self.index_numeric_field(*field, doc_id, *v)?;
509                }
510                (FieldType::I64, FieldValue::I64(v)) => {
511                    self.index_numeric_field(*field, doc_id, *v as u64)?;
512                }
513                (FieldType::F64, FieldValue::F64(v)) => {
514                    self.index_numeric_field(*field, doc_id, v.to_bits())?;
515                }
516                (FieldType::DenseVector, FieldValue::DenseVector(vec)) => {
517                    self.index_dense_vector_field(*field, doc_id, vec)?;
518                }
519                (FieldType::SparseVector, FieldValue::SparseVector(entries)) => {
520                    self.index_sparse_vector_field(*field, doc_id, entries)?;
521                }
522                _ => {}
523            }
524        }
525
526        // Stream document to disk immediately
527        self.write_document_to_store(&doc)?;
528
529        Ok(doc_id)
530    }
531
532    /// Index a text field using interned terms
533    ///
534    /// Optimization: Zero-allocation inline tokenization + term frequency aggregation.
535    /// Instead of allocating a String per token, we:
536    /// 1. Iterate over whitespace-split words
537    /// 2. Build lowercase token in a reusable buffer
538    /// 3. Intern directly from the buffer
539    ///
540    /// If position recording is enabled for this field, also records token positions
541    /// encoded as (element_ordinal << 20) | token_position.
542    fn index_text_field(
543        &mut self,
544        field: Field,
545        doc_id: DocId,
546        text: &str,
547        element_ordinal: u32,
548    ) -> Result<u32> {
549        use crate::dsl::PositionMode;
550
551        let field_id = field.0;
552        let position_mode = self
553            .position_enabled_fields
554            .get(&field_id)
555            .copied()
556            .flatten();
557
558        // Phase 1: Aggregate term frequencies within this document
559        // Also collect positions if enabled
560        // Reuse buffer to avoid allocations
561        self.local_tf_buffer.clear();
562
563        // For position tracking: term -> list of positions in this text
564        let mut local_positions: FxHashMap<Spur, Vec<u32>> = FxHashMap::default();
565
566        let mut token_position = 0u32;
567
568        // Zero-allocation tokenization: iterate words, lowercase inline, intern directly
569        for word in text.split_whitespace() {
570            // Build lowercase token in reusable buffer
571            self.token_buffer.clear();
572            for c in word.chars() {
573                if c.is_alphanumeric() {
574                    for lc in c.to_lowercase() {
575                        self.token_buffer.push(lc);
576                    }
577                }
578            }
579
580            if self.token_buffer.is_empty() {
581                continue;
582            }
583
584            // Intern the term directly from buffer - O(1) amortized
585            let term_spur = self.term_interner.get_or_intern(&self.token_buffer);
586            *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
587
588            // Record position based on mode
589            if let Some(mode) = position_mode {
590                let encoded_pos = match mode {
591                    // Ordinal only: just store element ordinal (token position = 0)
592                    PositionMode::Ordinal => element_ordinal << 20,
593                    // Token position only: just store token position (ordinal = 0)
594                    PositionMode::TokenPosition => token_position,
595                    // Full: encode both
596                    PositionMode::Full => (element_ordinal << 20) | token_position,
597                };
598                local_positions
599                    .entry(term_spur)
600                    .or_default()
601                    .push(encoded_pos);
602            }
603
604            token_position += 1;
605        }
606
607        // Phase 2: Insert aggregated terms into inverted index
608        // Now we only do one inverted_index lookup per unique term in doc
609        for (&term_spur, &tf) in &self.local_tf_buffer {
610            let term_key = TermKey {
611                field: field_id,
612                term: term_spur,
613            };
614
615            let posting = self
616                .inverted_index
617                .entry(term_key)
618                .or_insert_with(PostingListBuilder::new);
619            posting.add(doc_id, tf);
620
621            // Add positions if enabled
622            if position_mode.is_some()
623                && let Some(positions) = local_positions.get(&term_spur)
624            {
625                let pos_posting = self
626                    .position_index
627                    .entry(term_key)
628                    .or_insert_with(PositionPostingListBuilder::new);
629                for &pos in positions {
630                    pos_posting.add_position(doc_id, pos);
631                }
632            }
633        }
634
635        Ok(token_position)
636    }
637
638    fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
639        // For numeric fields, we use a special encoding
640        let term_str = format!("__num_{}", value);
641        let term_spur = self.term_interner.get_or_intern(&term_str);
642
643        let term_key = TermKey {
644            field: field.0,
645            term: term_spur,
646        };
647
648        let posting = self
649            .inverted_index
650            .entry(term_key)
651            .or_insert_with(PostingListBuilder::new);
652        posting.add(doc_id, 1);
653
654        Ok(())
655    }
656
657    /// Index a dense vector field
658    fn index_dense_vector_field(
659        &mut self,
660        field: Field,
661        doc_id: DocId,
662        vector: &[f32],
663    ) -> Result<()> {
664        let dim = vector.len();
665
666        let builder = self
667            .dense_vectors
668            .entry(field.0)
669            .or_insert_with(|| DenseVectorBuilder::new(dim));
670
671        // Verify dimension consistency
672        if builder.dim != dim && builder.len() > 0 {
673            return Err(crate::Error::Schema(format!(
674                "Dense vector dimension mismatch: expected {}, got {}",
675                builder.dim, dim
676            )));
677        }
678
679        builder.add(doc_id, vector);
680        Ok(())
681    }
682
683    /// Index a sparse vector field using dedicated sparse posting lists
684    ///
685    /// Collects (doc_id, weight) postings per dimension. During commit, these are
686    /// converted to BlockSparsePostingList with proper quantization from SparseVectorConfig.
687    ///
688    /// Weights below the configured `weight_threshold` are not indexed.
689    fn index_sparse_vector_field(
690        &mut self,
691        field: Field,
692        doc_id: DocId,
693        entries: &[(u32, f32)],
694    ) -> Result<()> {
695        // Get weight threshold from field config (default 0.0 = no filtering)
696        let weight_threshold = self
697            .schema
698            .get_field_entry(field)
699            .and_then(|entry| entry.sparse_vector_config.as_ref())
700            .map(|config| config.weight_threshold)
701            .unwrap_or(0.0);
702
703        let builder = self
704            .sparse_vectors
705            .entry(field.0)
706            .or_insert_with(SparseVectorBuilder::new);
707
708        for &(dim_id, weight) in entries {
709            // Skip weights below threshold
710            if weight.abs() < weight_threshold {
711                continue;
712            }
713
714            builder.add(dim_id, doc_id, weight);
715        }
716
717        Ok(())
718    }
719
720    /// Write document to streaming store
721    fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
722        use byteorder::{LittleEndian, WriteBytesExt};
723
724        let doc_bytes = super::store::serialize_document(doc, &self.schema)?;
725
726        self.store_file
727            .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
728        self.store_file.write_all(&doc_bytes)?;
729
730        Ok(())
731    }
732
733    /// Build the final segment
734    pub async fn build<D: Directory + DirectoryWriter>(
735        mut self,
736        dir: &D,
737        segment_id: SegmentId,
738    ) -> Result<SegmentMeta> {
739        // Flush any buffered data
740        self.store_file.flush()?;
741
742        let files = SegmentFiles::new(segment_id.0);
743
744        // Build positions FIRST to get offsets for TermInfo
745        let (positions_data, position_offsets) = self.build_positions_file()?;
746
747        // Extract data needed for parallel processing
748        let store_path = self.store_path.clone();
749        let schema = self.schema.clone();
750        let num_compression_threads = self.config.num_compression_threads;
751        let compression_level = self.config.compression_level;
752
753        // Build postings and document store in parallel
754        let (postings_result, store_result) = rayon::join(
755            || self.build_postings(&position_offsets),
756            || {
757                Self::build_store_parallel(
758                    &store_path,
759                    &schema,
760                    num_compression_threads,
761                    compression_level,
762                )
763            },
764        );
765
766        let (term_dict_data, postings_data) = postings_result?;
767        let store_data = store_result?;
768
769        // Write to directory
770        dir.write(&files.term_dict, &term_dict_data).await?;
771        dir.write(&files.postings, &postings_data).await?;
772        dir.write(&files.store, &store_data).await?;
773
774        // Write positions file (data only, offsets are in TermInfo)
775        if !positions_data.is_empty() {
776            dir.write(&files.positions, &positions_data).await?;
777        }
778
779        // Build and write dense vector indexes (RaBitQ) - all in one file
780        if !self.dense_vectors.is_empty() {
781            let vectors_data = self.build_vectors_file()?;
782            if !vectors_data.is_empty() {
783                dir.write(&files.vectors, &vectors_data).await?;
784            }
785        }
786
787        // Build and write sparse vector posting lists
788        if !self.sparse_vectors.is_empty() {
789            let sparse_data = self.build_sparse_file()?;
790            if !sparse_data.is_empty() {
791                dir.write(&files.sparse, &sparse_data).await?;
792            }
793        }
794
795        let meta = SegmentMeta {
796            id: segment_id.0,
797            num_docs: self.next_doc_id,
798            field_stats: self.field_stats.clone(),
799        };
800
801        dir.write(&files.meta, &meta.serialize()?).await?;
802
803        // Cleanup temp files
804        let _ = std::fs::remove_file(&self.store_path);
805
806        Ok(meta)
807    }
808
809    /// Build unified vectors file containing all dense vector indexes
810    ///
811    /// File format:
812    /// - Header: num_fields (u32)
813    /// - For each field: field_id (u32), index_type (u8), offset (u64), length (u64)
814    /// - Data: concatenated serialized indexes (RaBitQ, IVF-RaBitQ, or ScaNN)
815    fn build_vectors_file(&self) -> Result<Vec<u8>> {
816        use crate::dsl::VectorIndexType;
817        use byteorder::{LittleEndian, WriteBytesExt};
818
819        // Build all indexes first: (field_id, index_type, data)
820        let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
821
822        for (&field_id, builder) in &self.dense_vectors {
823            if builder.len() == 0 {
824                continue;
825            }
826
827            let field = crate::dsl::Field(field_id);
828
829            // Get dense vector config
830            let dense_config = self
831                .schema
832                .get_field_entry(field)
833                .and_then(|e| e.dense_vector_config.as_ref());
834
835            // Get vectors, potentially trimmed for matryoshka/MRL indexing
836            let index_dim = dense_config.map(|c| c.index_dim()).unwrap_or(builder.dim);
837            let vectors = if index_dim < builder.dim {
838                // Trim vectors to mrl_dim for indexing
839                builder.get_vectors_trimmed(index_dim)
840            } else {
841                builder.get_vectors()
842            };
843
844            let (index_type, index_bytes) = match dense_config.map(|c| c.index_type) {
845                Some(VectorIndexType::ScaNN) => {
846                    // ScaNN (IVF-PQ) index
847                    let config = dense_config.unwrap();
848                    let centroids_path =
849                        config.coarse_centroids_path.as_ref().ok_or_else(|| {
850                            crate::Error::Schema("ScaNN requires coarse_centroids_path".into())
851                        })?;
852                    let codebook_path = config.pq_codebook_path.as_ref().ok_or_else(|| {
853                        crate::Error::Schema("ScaNN requires pq_codebook_path".into())
854                    })?;
855
856                    let coarse_centroids = crate::structures::CoarseCentroids::load(
857                        std::path::Path::new(centroids_path),
858                    )
859                    .map_err(crate::Error::Io)?;
860
861                    let pq_codebook =
862                        crate::structures::PQCodebook::load(std::path::Path::new(codebook_path))
863                            .map_err(crate::Error::Io)?;
864
865                    let doc_ids: Vec<u32> = builder.doc_ids.clone();
866                    let ivfpq_config = crate::structures::IVFPQConfig::new(index_dim)
867                        .with_store_raw(config.store_raw);
868
869                    let ivfpq_index = crate::structures::IVFPQIndex::build(
870                        ivfpq_config,
871                        &coarse_centroids,
872                        &pq_codebook,
873                        &vectors,
874                        Some(doc_ids.as_slice()),
875                    );
876
877                    // Serialize ScaNN index (IVFPQIndex only - codebook loaded separately)
878                    let bytes = ivfpq_index
879                        .to_bytes()
880                        .map_err(|e| crate::Error::Serialization(e.to_string()))?;
881                    (2u8, bytes) // 2 = ScaNN
882                }
883                Some(VectorIndexType::IvfRaBitQ) => {
884                    // IVF-RaBitQ index
885                    let config = dense_config.unwrap();
886                    let centroids_path =
887                        config.coarse_centroids_path.as_ref().ok_or_else(|| {
888                            crate::Error::Schema("IVF-RaBitQ requires coarse_centroids_path".into())
889                        })?;
890
891                    match crate::structures::CoarseCentroids::load(std::path::Path::new(
892                        centroids_path,
893                    )) {
894                        Ok(coarse_centroids) => {
895                            let ivf_cfg = crate::structures::IVFRaBitQConfig::new(index_dim)
896                                .with_store_raw(config.store_raw);
897                            let rabitq_codebook = crate::structures::RaBitQCodebook::new(
898                                crate::structures::RaBitQConfig::new(index_dim),
899                            );
900                            let doc_ids: Vec<u32> = builder.doc_ids.clone();
901                            let ivf_index = crate::structures::IVFRaBitQIndex::build(
902                                ivf_cfg,
903                                &coarse_centroids,
904                                &rabitq_codebook,
905                                &vectors,
906                                Some(doc_ids.as_slice()),
907                            );
908                            let bytes = ivf_index
909                                .to_bytes()
910                                .map_err(|e| crate::Error::Serialization(e.to_string()))?;
911                            (1u8, bytes) // 1 = IVF-RaBitQ
912                        }
913                        Err(e) => {
914                            log::warn!("Failed to load centroids: {}, falling back to RaBitQ", e);
915                            let cfg = crate::structures::RaBitQConfig::new(index_dim);
916                            let idx = crate::structures::RaBitQIndex::build(cfg, &vectors, true);
917                            let bytes = serde_json::to_vec(&idx)
918                                .map_err(|e| crate::Error::Serialization(e.to_string()))?;
919                            (0u8, bytes) // 0 = RaBitQ
920                        }
921                    }
922                }
923                _ => {
924                    // Default: RaBitQ
925                    let store_raw = dense_config.map(|c| c.store_raw).unwrap_or(true);
926                    let cfg = crate::structures::RaBitQConfig::new(index_dim);
927                    let idx = crate::structures::RaBitQIndex::build(cfg, &vectors, store_raw);
928                    let bytes = serde_json::to_vec(&idx)
929                        .map_err(|e| crate::Error::Serialization(e.to_string()))?;
930                    (0u8, bytes) // 0 = RaBitQ
931                }
932            };
933
934            field_indexes.push((field_id, index_type, index_bytes));
935        }
936
937        if field_indexes.is_empty() {
938            return Ok(Vec::new());
939        }
940
941        // Sort by field_id for consistent ordering
942        field_indexes.sort_by_key(|(id, _, _)| *id);
943
944        // Calculate header size: num_fields + (field_id, index_type, offset, len) per field
945        let header_size = 4 + field_indexes.len() * (4 + 1 + 8 + 8);
946
947        // Build output
948        let mut output = Vec::new();
949
950        // Write number of fields
951        output.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
952
953        // Calculate offsets and write header entries
954        let mut current_offset = header_size as u64;
955        for (field_id, index_type, data) in &field_indexes {
956            output.write_u32::<LittleEndian>(*field_id)?;
957            output.write_u8(*index_type)?;
958            output.write_u64::<LittleEndian>(current_offset)?;
959            output.write_u64::<LittleEndian>(data.len() as u64)?;
960            current_offset += data.len() as u64;
961        }
962
963        // Write data
964        for (_, _, data) in field_indexes {
965            output.extend_from_slice(&data);
966        }
967
968        Ok(output)
969    }
970
971    /// Build sparse vectors file containing BlockSparsePostingList per field/dimension
972    ///
973    /// File format (direct-indexed table for O(1) dimension lookup):
974    /// - Header: num_fields (u32)
975    /// - For each field:
976    ///   - field_id (u32)
977    ///   - quantization (u8)
978    ///   - max_dim_id (u32)          ← highest dimension ID + 1 (table size)
979    ///   - table: [(offset: u64, length: u32)] × max_dim_id  ← direct indexed by dim_id
980    ///     (offset=0, length=0 means dimension not present)
981    /// - Data: concatenated serialized BlockSparsePostingList
982    fn build_sparse_file(&self) -> Result<Vec<u8>> {
983        use crate::structures::{BlockSparsePostingList, WeightQuantization};
984        use byteorder::{LittleEndian, WriteBytesExt};
985
986        if self.sparse_vectors.is_empty() {
987            return Ok(Vec::new());
988        }
989
990        // Collect field data: (field_id, quantization, max_dim_id, dim_id -> serialized_bytes)
991        type SparseFieldData = (u32, WeightQuantization, u32, FxHashMap<u32, Vec<u8>>);
992        let mut field_data: Vec<SparseFieldData> = Vec::new();
993
994        for (&field_id, builder) in &self.sparse_vectors {
995            if builder.is_empty() {
996                continue;
997            }
998
999            let field = crate::dsl::Field(field_id);
1000
1001            // Get quantization from field config
1002            let quantization = self
1003                .schema
1004                .get_field_entry(field)
1005                .and_then(|e| e.sparse_vector_config.as_ref())
1006                .map(|c| c.weight_quantization)
1007                .unwrap_or(WeightQuantization::Float32);
1008
1009            // Find max dimension ID
1010            let max_dim_id = builder.postings.keys().max().copied().unwrap_or(0);
1011
1012            // Build BlockSparsePostingList for each dimension
1013            let mut dim_bytes: FxHashMap<u32, Vec<u8>> = FxHashMap::default();
1014
1015            for (&dim_id, postings) in &builder.postings {
1016                // Sort postings by doc_id
1017                let mut sorted_postings = postings.clone();
1018                sorted_postings.sort_by_key(|(doc_id, _)| *doc_id);
1019
1020                // Build BlockSparsePostingList
1021                let block_list =
1022                    BlockSparsePostingList::from_postings(&sorted_postings, quantization)
1023                        .map_err(crate::Error::Io)?;
1024
1025                // Serialize
1026                let mut bytes = Vec::new();
1027                block_list.serialize(&mut bytes).map_err(crate::Error::Io)?;
1028
1029                dim_bytes.insert(dim_id, bytes);
1030            }
1031
1032            field_data.push((field_id, quantization, max_dim_id + 1, dim_bytes));
1033        }
1034
1035        if field_data.is_empty() {
1036            return Ok(Vec::new());
1037        }
1038
1039        // Sort by field_id
1040        field_data.sort_by_key(|(id, _, _, _)| *id);
1041
1042        // Calculate header size
1043        // Header: num_fields (4)
1044        // Per field: field_id (4) + quant (1) + max_dim_id (4) + table (12 * max_dim_id)
1045        let mut header_size = 4u64;
1046        for (_, _, max_dim_id, _) in &field_data {
1047            header_size += 4 + 1 + 4; // field_id + quant + max_dim_id
1048            header_size += (*max_dim_id as u64) * 12; // table entries: (offset: u64, length: u32)
1049        }
1050
1051        // Build output
1052        let mut output = Vec::new();
1053
1054        // Write num_fields
1055        output.write_u32::<LittleEndian>(field_data.len() as u32)?;
1056
1057        // Track current data offset (after all headers)
1058        let mut current_offset = header_size;
1059
1060        // First, collect all data bytes in order and build offset tables
1061        let mut all_data: Vec<u8> = Vec::new();
1062        let mut field_tables: Vec<Vec<(u64, u32)>> = Vec::new();
1063
1064        for (_, _, max_dim_id, dim_bytes) in &field_data {
1065            let mut table: Vec<(u64, u32)> = vec![(0, 0); *max_dim_id as usize];
1066
1067            // Process dimensions in order
1068            for dim_id in 0..*max_dim_id {
1069                if let Some(bytes) = dim_bytes.get(&dim_id) {
1070                    table[dim_id as usize] = (current_offset, bytes.len() as u32);
1071                    current_offset += bytes.len() as u64;
1072                    all_data.extend_from_slice(bytes);
1073                }
1074                // else: table entry stays (0, 0) meaning dimension not present
1075            }
1076
1077            field_tables.push(table);
1078        }
1079
1080        // Write field headers and tables
1081        for (i, (field_id, quantization, max_dim_id, _)) in field_data.iter().enumerate() {
1082            output.write_u32::<LittleEndian>(*field_id)?;
1083            output.write_u8(*quantization as u8)?;
1084            output.write_u32::<LittleEndian>(*max_dim_id)?;
1085
1086            // Write table (direct indexed by dim_id)
1087            for &(offset, length) in &field_tables[i] {
1088                output.write_u64::<LittleEndian>(offset)?;
1089                output.write_u32::<LittleEndian>(length)?;
1090            }
1091        }
1092
1093        // Write data
1094        output.extend_from_slice(&all_data);
1095
1096        Ok(output)
1097    }
1098
1099    /// Build positions file for phrase queries
1100    ///
1101    /// File format:
1102    /// - Data only: concatenated serialized PositionPostingList
1103    /// - Position offsets are stored in TermInfo (no separate header needed)
1104    ///
1105    /// Returns: (positions_data, term_key -> (offset, len) mapping)
1106    #[allow(clippy::type_complexity)]
1107    fn build_positions_file(&self) -> Result<(Vec<u8>, FxHashMap<Vec<u8>, (u64, u32)>)> {
1108        use crate::structures::PositionPostingList;
1109
1110        let mut position_offsets: FxHashMap<Vec<u8>, (u64, u32)> = FxHashMap::default();
1111
1112        if self.position_index.is_empty() {
1113            return Ok((Vec::new(), position_offsets));
1114        }
1115
1116        // Collect and sort entries by key
1117        let mut entries: Vec<(Vec<u8>, &PositionPostingListBuilder)> = self
1118            .position_index
1119            .iter()
1120            .map(|(term_key, pos_list)| {
1121                let term_str = self.term_interner.resolve(&term_key.term);
1122                let mut key = Vec::with_capacity(4 + term_str.len());
1123                key.extend_from_slice(&term_key.field.to_le_bytes());
1124                key.extend_from_slice(term_str.as_bytes());
1125                (key, pos_list)
1126            })
1127            .collect();
1128
1129        entries.sort_by(|a, b| a.0.cmp(&b.0));
1130
1131        // Serialize all position lists and track offsets
1132        let mut output = Vec::new();
1133
1134        for (key, pos_builder) in entries {
1135            // Convert builder to PositionPostingList
1136            let mut pos_list = PositionPostingList::with_capacity(pos_builder.postings.len());
1137            for (doc_id, positions) in &pos_builder.postings {
1138                pos_list.push(*doc_id, positions.clone());
1139            }
1140
1141            // Serialize and track offset
1142            let offset = output.len() as u64;
1143            pos_list.serialize(&mut output).map_err(crate::Error::Io)?;
1144            let len = (output.len() as u64 - offset) as u32;
1145
1146            position_offsets.insert(key, (offset, len));
1147        }
1148
1149        Ok((output, position_offsets))
1150    }
1151
1152    /// Build postings from inverted index
1153    ///
1154    /// Uses parallel processing to serialize posting lists concurrently.
1155    /// Position offsets are looked up and embedded in TermInfo.
1156    fn build_postings(
1157        &mut self,
1158        position_offsets: &FxHashMap<Vec<u8>, (u64, u32)>,
1159    ) -> Result<(Vec<u8>, Vec<u8>)> {
1160        // Phase 1: Collect and sort term keys (parallel key generation)
1161        // Key format: field_id (4 bytes) + term bytes
1162        let mut term_entries: Vec<(Vec<u8>, &PostingListBuilder)> = self
1163            .inverted_index
1164            .iter()
1165            .map(|(term_key, posting_list)| {
1166                let term_str = self.term_interner.resolve(&term_key.term);
1167                let mut key = Vec::with_capacity(4 + term_str.len());
1168                key.extend_from_slice(&term_key.field.to_le_bytes());
1169                key.extend_from_slice(term_str.as_bytes());
1170                (key, posting_list)
1171            })
1172            .collect();
1173
1174        // Sort by key for SSTable ordering
1175        term_entries.par_sort_unstable_by(|a, b| a.0.cmp(&b.0));
1176
1177        // Phase 2: Parallel serialization of posting lists
1178        // Each term's posting list is serialized independently
1179        let serialized: Vec<(Vec<u8>, SerializedPosting)> = term_entries
1180            .into_par_iter()
1181            .map(|(key, posting_builder)| {
1182                // Build posting list from in-memory postings
1183                let mut full_postings = PostingList::with_capacity(posting_builder.len());
1184                for p in &posting_builder.postings {
1185                    full_postings.push(p.doc_id, p.term_freq as u32);
1186                }
1187
1188                // Build term info
1189                let doc_ids: Vec<u32> = full_postings.iter().map(|p| p.doc_id).collect();
1190                let term_freqs: Vec<u32> = full_postings.iter().map(|p| p.term_freq).collect();
1191
1192                // Don't inline if term has positions (inline format doesn't support position offsets)
1193                let has_positions = position_offsets.contains_key(&key);
1194                let result = if !has_positions
1195                    && let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs)
1196                {
1197                    SerializedPosting::Inline(inline)
1198                } else {
1199                    // Serialize to local buffer
1200                    let mut posting_bytes = Vec::new();
1201                    let block_list =
1202                        crate::structures::BlockPostingList::from_posting_list(&full_postings)
1203                            .expect("BlockPostingList creation failed");
1204                    block_list
1205                        .serialize(&mut posting_bytes)
1206                        .expect("BlockPostingList serialization failed");
1207                    SerializedPosting::External {
1208                        bytes: posting_bytes,
1209                        doc_count: full_postings.doc_count(),
1210                    }
1211                };
1212
1213                (key, result)
1214            })
1215            .collect();
1216
1217        // Phase 3: Sequential assembly (must be sequential for offset calculation)
1218        let mut term_dict = Vec::new();
1219        let mut postings = Vec::new();
1220        let mut writer = SSTableWriter::<TermInfo>::new(&mut term_dict);
1221
1222        for (key, serialized_posting) in serialized {
1223            let term_info = match serialized_posting {
1224                SerializedPosting::Inline(info) => info,
1225                SerializedPosting::External { bytes, doc_count } => {
1226                    let posting_offset = postings.len() as u64;
1227                    let posting_len = bytes.len() as u32;
1228                    postings.extend_from_slice(&bytes);
1229
1230                    // Look up position offset for this term
1231                    if let Some(&(pos_offset, pos_len)) = position_offsets.get(&key) {
1232                        TermInfo::external_with_positions(
1233                            posting_offset,
1234                            posting_len,
1235                            doc_count,
1236                            pos_offset,
1237                            pos_len,
1238                        )
1239                    } else {
1240                        TermInfo::external(posting_offset, posting_len, doc_count)
1241                    }
1242                }
1243            };
1244
1245            writer.insert(&key, &term_info)?;
1246        }
1247
1248        writer.finish()?;
1249        Ok((term_dict, postings))
1250    }
1251
1252    /// Build document store from streamed temp file (static method for parallel execution)
1253    ///
1254    /// Uses parallel processing to deserialize documents concurrently.
1255    fn build_store_parallel(
1256        store_path: &PathBuf,
1257        schema: &Schema,
1258        num_compression_threads: usize,
1259        compression_level: CompressionLevel,
1260    ) -> Result<Vec<u8>> {
1261        use super::store::EagerParallelStoreWriter;
1262
1263        let file = File::open(store_path)?;
1264        let mmap = unsafe { memmap2::Mmap::map(&file)? };
1265
1266        // Phase 1: Parse document boundaries (sequential, fast)
1267        let mut doc_ranges: Vec<(usize, usize)> = Vec::new();
1268        let mut offset = 0usize;
1269        while offset + 4 <= mmap.len() {
1270            let doc_len = u32::from_le_bytes([
1271                mmap[offset],
1272                mmap[offset + 1],
1273                mmap[offset + 2],
1274                mmap[offset + 3],
1275            ]) as usize;
1276            offset += 4;
1277
1278            if offset + doc_len > mmap.len() {
1279                break;
1280            }
1281
1282            doc_ranges.push((offset, doc_len));
1283            offset += doc_len;
1284        }
1285
1286        // Phase 2: Parallel deserialization of documents
1287        let docs: Vec<Document> = doc_ranges
1288            .into_par_iter()
1289            .filter_map(|(start, len)| {
1290                let doc_bytes = &mmap[start..start + len];
1291                super::store::deserialize_document(doc_bytes, schema).ok()
1292            })
1293            .collect();
1294
1295        // Phase 3: Write to store (compression is already parallel in EagerParallelStoreWriter)
1296        let mut store_data = Vec::new();
1297        let mut store_writer = EagerParallelStoreWriter::with_compression_level(
1298            &mut store_data,
1299            num_compression_threads,
1300            compression_level,
1301        );
1302
1303        for doc in &docs {
1304            store_writer.store(doc, schema)?;
1305        }
1306
1307        store_writer.finish()?;
1308        Ok(store_data)
1309    }
1310}
1311
1312impl Drop for SegmentBuilder {
1313    fn drop(&mut self) {
1314        // Cleanup temp files on drop
1315        let _ = std::fs::remove_file(&self.store_path);
1316    }
1317}