hermes_core/segment/
builder.rs

1//! Streaming segment builder with optimized memory usage
2//!
3//! Key optimizations:
4//! - **String interning**: Terms are interned using `lasso` to avoid repeated allocations
5//! - **hashbrown HashMap**: O(1) average insertion instead of BTreeMap's O(log n)
6//! - **Streaming document store**: Documents written to disk immediately
7//! - **Incremental posting flush**: Large posting lists flushed to temp file
8//! - **Memory-mapped intermediate files**: Reduces memory pressure
9//! - **Arena allocation**: Batch allocations for reduced fragmentation
10
11use std::fs::{File, OpenOptions};
12use std::io::{BufWriter, Write};
13use std::path::PathBuf;
14use std::sync::Arc;
15
16use hashbrown::HashMap;
17use lasso::{Rodeo, Spur};
18use rayon::prelude::*;
19use rustc_hash::FxHashMap;
20
21use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
22use crate::compression::CompressionLevel;
23use crate::directories::{Directory, DirectoryWriter};
24use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
25use crate::structures::{PostingList, SSTableWriter, TermInfo};
26use crate::tokenizer::BoxedTokenizer;
27use crate::wand::WandData;
28use crate::{DocId, Result};
29
30/// Size of the document store buffer before writing to disk
31const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; // 16MB
32
33/// Interned term key combining field and term
34#[derive(Clone, Copy, PartialEq, Eq, Hash)]
35struct TermKey {
36    field: u32,
37    term: Spur,
38}
39
40/// Compact posting entry for in-memory storage
41#[derive(Clone, Copy)]
42struct CompactPosting {
43    doc_id: DocId,
44    term_freq: u16,
45}
46
47/// In-memory posting list for a term
48struct PostingListBuilder {
49    /// In-memory postings
50    postings: Vec<CompactPosting>,
51}
52
53impl PostingListBuilder {
54    fn new() -> Self {
55        Self {
56            postings: Vec::new(),
57        }
58    }
59
60    /// Add a posting, merging if same doc_id as last
61    #[inline]
62    fn add(&mut self, doc_id: DocId, term_freq: u32) {
63        // Check if we can merge with the last posting
64        if let Some(last) = self.postings.last_mut()
65            && last.doc_id == doc_id
66        {
67            last.term_freq = last.term_freq.saturating_add(term_freq as u16);
68            return;
69        }
70        self.postings.push(CompactPosting {
71            doc_id,
72            term_freq: term_freq.min(u16::MAX as u32) as u16,
73        });
74    }
75
76    fn len(&self) -> usize {
77        self.postings.len()
78    }
79}
80
81/// Intermediate result for parallel posting serialization
82enum SerializedPosting {
83    /// Inline posting (small enough to fit in TermInfo)
84    Inline(TermInfo),
85    /// External posting with serialized bytes
86    External { bytes: Vec<u8>, doc_count: u32 },
87}
88
89/// Statistics for debugging segment builder performance
90#[derive(Debug, Clone)]
91pub struct SegmentBuilderStats {
92    /// Number of documents indexed
93    pub num_docs: u32,
94    /// Number of unique terms in the inverted index
95    pub unique_terms: usize,
96    /// Total postings in memory (across all terms)
97    pub postings_in_memory: usize,
98    /// Number of interned strings
99    pub interned_strings: usize,
100    /// Size of doc_field_lengths vector
101    pub doc_field_lengths_size: usize,
102    /// Estimated total memory usage in bytes
103    pub estimated_memory_bytes: usize,
104    /// Memory breakdown by component
105    pub memory_breakdown: MemoryBreakdown,
106}
107
108/// Detailed memory breakdown by component
109#[derive(Debug, Clone, Default)]
110pub struct MemoryBreakdown {
111    /// Postings memory (CompactPosting structs)
112    pub postings_bytes: usize,
113    /// Inverted index HashMap overhead
114    pub index_overhead_bytes: usize,
115    /// Term interner memory
116    pub interner_bytes: usize,
117    /// Document field lengths
118    pub field_lengths_bytes: usize,
119    /// Dense vector storage
120    pub dense_vectors_bytes: usize,
121    /// Number of dense vectors
122    pub dense_vector_count: usize,
123}
124
125/// Configuration for segment builder
126#[derive(Clone)]
127pub struct SegmentBuilderConfig {
128    /// Directory for temporary spill files
129    pub temp_dir: PathBuf,
130    /// Compression level for document store
131    pub compression_level: CompressionLevel,
132    /// Number of threads for parallel compression
133    pub num_compression_threads: usize,
134    /// Initial capacity for term interner
135    pub interner_capacity: usize,
136    /// Initial capacity for posting lists hashmap
137    pub posting_map_capacity: usize,
138}
139
140impl Default for SegmentBuilderConfig {
141    fn default() -> Self {
142        Self {
143            temp_dir: std::env::temp_dir(),
144            compression_level: CompressionLevel(7),
145            num_compression_threads: num_cpus::get(),
146            interner_capacity: 1_000_000,
147            posting_map_capacity: 500_000,
148        }
149    }
150}
151
152/// Segment builder with optimized memory usage
153///
154/// Features:
155/// - Streams documents to disk immediately (no in-memory document storage)
156/// - Uses string interning for terms (reduced allocations)
157/// - Uses hashbrown HashMap (faster than BTreeMap)
158pub struct SegmentBuilder {
159    schema: Schema,
160    config: SegmentBuilderConfig,
161    tokenizers: FxHashMap<Field, BoxedTokenizer>,
162
163    /// String interner for terms - O(1) lookup and deduplication
164    term_interner: Rodeo,
165
166    /// Inverted index: term key -> posting list
167    inverted_index: HashMap<TermKey, PostingListBuilder>,
168
169    /// Streaming document store writer
170    store_file: BufWriter<File>,
171    store_path: PathBuf,
172
173    /// Document count
174    next_doc_id: DocId,
175
176    /// Per-field statistics for BM25F
177    field_stats: FxHashMap<u32, FieldStats>,
178
179    /// Per-document field lengths stored compactly
180    /// Uses a flat Vec instead of Vec<HashMap> for better cache locality
181    /// Layout: [doc0_field0_len, doc0_field1_len, ..., doc1_field0_len, ...]
182    doc_field_lengths: Vec<u32>,
183    num_indexed_fields: usize,
184    field_to_slot: FxHashMap<u32, usize>,
185
186    /// Optional pre-computed WAND data for IDF values
187    wand_data: Option<Arc<WandData>>,
188
189    /// Reusable buffer for per-document term frequency aggregation
190    /// Avoids allocating a new hashmap for each document
191    local_tf_buffer: FxHashMap<Spur, u32>,
192
193    /// Reusable buffer for tokenization to avoid per-token String allocations
194    token_buffer: String,
195
196    /// Dense vector storage per field: field -> (doc_ids, vectors)
197    /// Vectors are stored as flat f32 arrays for efficient RaBitQ indexing
198    dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
199}
200
201/// Builder for dense vector index using RaBitQ
202struct DenseVectorBuilder {
203    /// Dimension of vectors
204    dim: usize,
205    /// Document IDs with vectors
206    doc_ids: Vec<DocId>,
207    /// Flat vector storage (doc_ids.len() * dim floats)
208    vectors: Vec<f32>,
209}
210
211impl DenseVectorBuilder {
212    fn new(dim: usize) -> Self {
213        Self {
214            dim,
215            doc_ids: Vec::new(),
216            vectors: Vec::new(),
217        }
218    }
219
220    fn add(&mut self, doc_id: DocId, vector: &[f32]) {
221        debug_assert_eq!(vector.len(), self.dim, "Vector dimension mismatch");
222        self.doc_ids.push(doc_id);
223        self.vectors.extend_from_slice(vector);
224    }
225
226    fn len(&self) -> usize {
227        self.doc_ids.len()
228    }
229
230    /// Get all vectors as Vec<Vec<f32>> for RaBitQ indexing
231    fn get_vectors(&self) -> Vec<Vec<f32>> {
232        self.doc_ids
233            .iter()
234            .enumerate()
235            .map(|(i, _)| {
236                let start = i * self.dim;
237                self.vectors[start..start + self.dim].to_vec()
238            })
239            .collect()
240    }
241
242    /// Get vectors trimmed to specified dimension for matryoshka/MRL indexing
243    fn get_vectors_trimmed(&self, trim_dim: usize) -> Vec<Vec<f32>> {
244        debug_assert!(trim_dim <= self.dim, "trim_dim must be <= dim");
245        self.doc_ids
246            .iter()
247            .enumerate()
248            .map(|(i, _)| {
249                let start = i * self.dim;
250                self.vectors[start..start + trim_dim].to_vec()
251            })
252            .collect()
253    }
254}
255
256impl SegmentBuilder {
257    /// Create a new segment builder
258    pub fn new(schema: Schema, config: SegmentBuilderConfig) -> Result<Self> {
259        let segment_id = uuid::Uuid::new_v4();
260        let store_path = config
261            .temp_dir
262            .join(format!("hermes_store_{}.tmp", segment_id));
263
264        let store_file = BufWriter::with_capacity(
265            STORE_BUFFER_SIZE,
266            OpenOptions::new()
267                .create(true)
268                .write(true)
269                .truncate(true)
270                .open(&store_path)?,
271        );
272
273        // Count indexed fields for compact field length storage
274        let mut num_indexed_fields = 0;
275        let mut field_to_slot = FxHashMap::default();
276        for (field, entry) in schema.fields() {
277            if entry.indexed && matches!(entry.field_type, FieldType::Text) {
278                field_to_slot.insert(field.0, num_indexed_fields);
279                num_indexed_fields += 1;
280            }
281        }
282
283        Ok(Self {
284            schema,
285            tokenizers: FxHashMap::default(),
286            term_interner: Rodeo::new(),
287            inverted_index: HashMap::with_capacity(config.posting_map_capacity),
288            store_file,
289            store_path,
290            next_doc_id: 0,
291            field_stats: FxHashMap::default(),
292            doc_field_lengths: Vec::new(),
293            num_indexed_fields,
294            field_to_slot,
295            wand_data: None,
296            local_tf_buffer: FxHashMap::default(),
297            token_buffer: String::with_capacity(64),
298            config,
299            dense_vectors: FxHashMap::default(),
300        })
301    }
302
303    /// Create with pre-computed WAND data
304    pub fn with_wand_data(
305        schema: Schema,
306        config: SegmentBuilderConfig,
307        wand_data: Arc<WandData>,
308    ) -> Result<Self> {
309        let mut builder = Self::new(schema, config)?;
310        builder.wand_data = Some(wand_data);
311        Ok(builder)
312    }
313
314    pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
315        self.tokenizers.insert(field, tokenizer);
316    }
317
318    pub fn num_docs(&self) -> u32 {
319        self.next_doc_id
320    }
321
322    /// Get current statistics for debugging performance
323    pub fn stats(&self) -> SegmentBuilderStats {
324        use std::mem::size_of;
325
326        let postings_in_memory: usize =
327            self.inverted_index.values().map(|p| p.postings.len()).sum();
328
329        // Precise memory calculation using actual struct sizes
330        // CompactPosting: doc_id (u32) + term_freq (u16) = 6 bytes, but may have padding
331        let compact_posting_size = size_of::<CompactPosting>();
332
333        // Postings: actual Vec capacity * element size + Vec overhead (24 bytes on 64-bit)
334        let postings_bytes: usize = self
335            .inverted_index
336            .values()
337            .map(|p| {
338                p.postings.capacity() * compact_posting_size + size_of::<Vec<CompactPosting>>()
339            })
340            .sum();
341
342        // Inverted index: HashMap overhead per entry
343        // Each entry: TermKey (field u32 + Spur 4 bytes = 8 bytes) + PostingListBuilder + HashMap bucket overhead
344        // HashMap typically uses ~1.5x capacity, each bucket ~16-24 bytes
345        let term_key_size = size_of::<TermKey>();
346        let posting_builder_size = size_of::<PostingListBuilder>();
347        let hashmap_entry_overhead = 24; // bucket pointer + metadata
348        let index_overhead_bytes = self.inverted_index.len()
349            * (term_key_size + posting_builder_size + hashmap_entry_overhead);
350
351        // Term interner: Rodeo stores strings + metadata
352        // Each interned string: actual string bytes + Spur (4 bytes) + internal overhead (~16 bytes)
353        // We can't get exact string lengths, so estimate average term length of 8 bytes
354        let avg_term_len = 8;
355        let interner_overhead_per_string = size_of::<lasso::Spur>() + 16;
356        let interner_bytes =
357            self.term_interner.len() * (avg_term_len + interner_overhead_per_string);
358
359        // Doc field lengths: Vec<u32> with capacity
360        let field_lengths_bytes =
361            self.doc_field_lengths.capacity() * size_of::<u32>() + size_of::<Vec<u32>>();
362
363        // Dense vectors: actual capacity used
364        let mut dense_vectors_bytes: usize = 0;
365        let mut dense_vector_count: usize = 0;
366        for b in self.dense_vectors.values() {
367            // vectors: Vec<f32> capacity + doc_ids: Vec<DocId> capacity
368            dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
369                + b.doc_ids.capacity() * size_of::<DocId>()
370                + size_of::<Vec<f32>>()
371                + size_of::<Vec<DocId>>();
372            dense_vector_count += b.doc_ids.len();
373        }
374
375        // Local buffers
376        let local_tf_buffer_bytes =
377            self.local_tf_buffer.capacity() * (size_of::<lasso::Spur>() + size_of::<u32>() + 16);
378
379        let estimated_memory_bytes = postings_bytes
380            + index_overhead_bytes
381            + interner_bytes
382            + field_lengths_bytes
383            + dense_vectors_bytes
384            + local_tf_buffer_bytes;
385
386        let memory_breakdown = MemoryBreakdown {
387            postings_bytes,
388            index_overhead_bytes,
389            interner_bytes,
390            field_lengths_bytes,
391            dense_vectors_bytes,
392            dense_vector_count,
393        };
394
395        SegmentBuilderStats {
396            num_docs: self.next_doc_id,
397            unique_terms: self.inverted_index.len(),
398            postings_in_memory,
399            interned_strings: self.term_interner.len(),
400            doc_field_lengths_size: self.doc_field_lengths.len(),
401            estimated_memory_bytes,
402            memory_breakdown,
403        }
404    }
405
406    /// Add a document - streams to disk immediately
407    pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
408        let doc_id = self.next_doc_id;
409        self.next_doc_id += 1;
410
411        // Initialize field lengths for this document
412        let base_idx = self.doc_field_lengths.len();
413        self.doc_field_lengths
414            .resize(base_idx + self.num_indexed_fields, 0);
415
416        for (field, value) in doc.field_values() {
417            let entry = self.schema.get_field_entry(*field);
418            if entry.is_none() || !entry.unwrap().indexed {
419                continue;
420            }
421
422            let entry = entry.unwrap();
423            match (&entry.field_type, value) {
424                (FieldType::Text, FieldValue::Text(text)) => {
425                    let token_count = self.index_text_field(*field, doc_id, text)?;
426
427                    // Update field statistics
428                    let stats = self.field_stats.entry(field.0).or_default();
429                    stats.total_tokens += token_count as u64;
430                    stats.doc_count += 1;
431
432                    // Store field length compactly
433                    if let Some(&slot) = self.field_to_slot.get(&field.0) {
434                        self.doc_field_lengths[base_idx + slot] = token_count;
435                    }
436                }
437                (FieldType::U64, FieldValue::U64(v)) => {
438                    self.index_numeric_field(*field, doc_id, *v)?;
439                }
440                (FieldType::I64, FieldValue::I64(v)) => {
441                    self.index_numeric_field(*field, doc_id, *v as u64)?;
442                }
443                (FieldType::F64, FieldValue::F64(v)) => {
444                    self.index_numeric_field(*field, doc_id, v.to_bits())?;
445                }
446                (FieldType::DenseVector, FieldValue::DenseVector(vec)) => {
447                    self.index_dense_vector_field(*field, doc_id, vec)?;
448                }
449                (FieldType::SparseVector, FieldValue::SparseVector { indices, values }) => {
450                    self.index_sparse_vector_field(*field, doc_id, indices, values)?;
451                }
452                _ => {}
453            }
454        }
455
456        // Stream document to disk immediately
457        self.write_document_to_store(&doc)?;
458
459        Ok(doc_id)
460    }
461
462    /// Index a text field using interned terms
463    ///
464    /// Optimization: Zero-allocation inline tokenization + term frequency aggregation.
465    /// Instead of allocating a String per token, we:
466    /// 1. Iterate over whitespace-split words
467    /// 2. Build lowercase token in a reusable buffer
468    /// 3. Intern directly from the buffer
469    fn index_text_field(&mut self, field: Field, doc_id: DocId, text: &str) -> Result<u32> {
470        // Phase 1: Aggregate term frequencies within this document
471        // Reuse buffer to avoid allocations
472        self.local_tf_buffer.clear();
473
474        let mut token_count = 0u32;
475
476        // Zero-allocation tokenization: iterate words, lowercase inline, intern directly
477        for word in text.split_whitespace() {
478            // Build lowercase token in reusable buffer
479            self.token_buffer.clear();
480            for c in word.chars() {
481                if c.is_alphanumeric() {
482                    for lc in c.to_lowercase() {
483                        self.token_buffer.push(lc);
484                    }
485                }
486            }
487
488            if self.token_buffer.is_empty() {
489                continue;
490            }
491
492            token_count += 1;
493
494            // Intern the term directly from buffer - O(1) amortized
495            let term_spur = self.term_interner.get_or_intern(&self.token_buffer);
496            *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
497        }
498
499        // Phase 2: Insert aggregated terms into inverted index
500        // Now we only do one inverted_index lookup per unique term in doc
501        let field_id = field.0;
502
503        for (&term_spur, &tf) in &self.local_tf_buffer {
504            let term_key = TermKey {
505                field: field_id,
506                term: term_spur,
507            };
508
509            let posting = self
510                .inverted_index
511                .entry(term_key)
512                .or_insert_with(PostingListBuilder::new);
513            posting.add(doc_id, tf);
514        }
515
516        Ok(token_count)
517    }
518
519    fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
520        // For numeric fields, we use a special encoding
521        let term_str = format!("__num_{}", value);
522        let term_spur = self.term_interner.get_or_intern(&term_str);
523
524        let term_key = TermKey {
525            field: field.0,
526            term: term_spur,
527        };
528
529        let posting = self
530            .inverted_index
531            .entry(term_key)
532            .or_insert_with(PostingListBuilder::new);
533        posting.add(doc_id, 1);
534
535        Ok(())
536    }
537
538    /// Index a dense vector field
539    fn index_dense_vector_field(
540        &mut self,
541        field: Field,
542        doc_id: DocId,
543        vector: &[f32],
544    ) -> Result<()> {
545        let dim = vector.len();
546
547        let builder = self
548            .dense_vectors
549            .entry(field.0)
550            .or_insert_with(|| DenseVectorBuilder::new(dim));
551
552        // Verify dimension consistency
553        if builder.dim != dim && builder.len() > 0 {
554            return Err(crate::Error::Schema(format!(
555                "Dense vector dimension mismatch: expected {}, got {}",
556                builder.dim, dim
557            )));
558        }
559
560        builder.add(doc_id, vector);
561        Ok(())
562    }
563
564    /// Index a sparse vector field using the inverted index
565    ///
566    /// Each dimension becomes a term (prefixed with __sparse_), and the weight
567    /// is stored as a quantized term frequency. This allows reusing the existing
568    /// posting list infrastructure for sparse vector retrieval.
569    ///
570    /// Weights below the configured `weight_threshold` are not indexed.
571    fn index_sparse_vector_field(
572        &mut self,
573        field: Field,
574        doc_id: DocId,
575        indices: &[u32],
576        values: &[f32],
577    ) -> Result<()> {
578        // Get weight threshold from field config (default 0.0 = no filtering)
579        let weight_threshold = self
580            .schema
581            .get_field_entry(field)
582            .and_then(|entry| entry.sparse_vector_config.as_ref())
583            .map(|config| config.weight_threshold)
584            .unwrap_or(0.0);
585
586        for (&dim_id, &weight) in indices.iter().zip(values.iter()) {
587            // Skip weights below threshold
588            if weight.abs() < weight_threshold {
589                continue;
590            }
591
592            // Use a special prefix to distinguish sparse vector dimensions from text terms
593            let term_str = format!("__sparse_{}", dim_id);
594            let term_spur = self.term_interner.get_or_intern(&term_str);
595
596            let term_key = TermKey {
597                field: field.0,
598                term: term_spur,
599            };
600
601            // Quantize weight to term frequency (scale by 1000 for precision)
602            // This is a simple approach; more sophisticated quantization can be added
603            let quantized_weight = (weight.abs() * 1000.0).min(u16::MAX as f32) as u32;
604
605            let posting = self
606                .inverted_index
607                .entry(term_key)
608                .or_insert_with(PostingListBuilder::new);
609            posting.add(doc_id, quantized_weight.max(1));
610        }
611
612        Ok(())
613    }
614
615    /// Write document to streaming store
616    fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
617        use byteorder::{LittleEndian, WriteBytesExt};
618
619        let doc_bytes = super::store::serialize_document(doc, &self.schema)?;
620
621        self.store_file
622            .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
623        self.store_file.write_all(&doc_bytes)?;
624
625        Ok(())
626    }
627
628    /// Build the final segment
629    pub async fn build<D: Directory + DirectoryWriter>(
630        mut self,
631        dir: &D,
632        segment_id: SegmentId,
633    ) -> Result<SegmentMeta> {
634        // Flush any buffered data
635        self.store_file.flush()?;
636
637        let files = SegmentFiles::new(segment_id.0);
638
639        // Extract data needed for parallel processing
640        let store_path = self.store_path.clone();
641        let schema = self.schema.clone();
642        let num_compression_threads = self.config.num_compression_threads;
643        let compression_level = self.config.compression_level;
644
645        // Build postings and document store in parallel
646        let (postings_result, store_result) = rayon::join(
647            || self.build_postings(),
648            || {
649                Self::build_store_parallel(
650                    &store_path,
651                    &schema,
652                    num_compression_threads,
653                    compression_level,
654                )
655            },
656        );
657
658        let (term_dict_data, postings_data) = postings_result?;
659        let store_data = store_result?;
660
661        // Write to directory
662        dir.write(&files.term_dict, &term_dict_data).await?;
663        dir.write(&files.postings, &postings_data).await?;
664        dir.write(&files.store, &store_data).await?;
665
666        // Build and write dense vector indexes (RaBitQ) - all in one file
667        if !self.dense_vectors.is_empty() {
668            let vectors_data = self.build_vectors_file()?;
669            if !vectors_data.is_empty() {
670                dir.write(&files.vectors, &vectors_data).await?;
671            }
672        }
673
674        let meta = SegmentMeta {
675            id: segment_id.0,
676            num_docs: self.next_doc_id,
677            field_stats: self.field_stats.clone(),
678        };
679
680        dir.write(&files.meta, &meta.serialize()?).await?;
681
682        // Cleanup temp files
683        let _ = std::fs::remove_file(&self.store_path);
684
685        Ok(meta)
686    }
687
688    /// Build unified vectors file containing all dense vector indexes
689    ///
690    /// File format:
691    /// - Header: num_fields (u32)
692    /// - For each field: field_id (u32), index_type (u8), offset (u64), length (u64)
693    /// - Data: concatenated serialized indexes (RaBitQ, IVF-RaBitQ, or ScaNN)
694    fn build_vectors_file(&self) -> Result<Vec<u8>> {
695        use crate::dsl::VectorIndexType;
696        use byteorder::{LittleEndian, WriteBytesExt};
697
698        // Build all indexes first: (field_id, index_type, data)
699        let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
700
701        for (&field_id, builder) in &self.dense_vectors {
702            if builder.len() == 0 {
703                continue;
704            }
705
706            let field = crate::dsl::Field(field_id);
707
708            // Get dense vector config
709            let dense_config = self
710                .schema
711                .get_field_entry(field)
712                .and_then(|e| e.dense_vector_config.as_ref());
713
714            // Get vectors, potentially trimmed for matryoshka/MRL indexing
715            let index_dim = dense_config.map(|c| c.index_dim()).unwrap_or(builder.dim);
716            let vectors = if index_dim < builder.dim {
717                // Trim vectors to mrl_dim for indexing
718                builder.get_vectors_trimmed(index_dim)
719            } else {
720                builder.get_vectors()
721            };
722
723            let (index_type, index_bytes) = match dense_config.map(|c| c.index_type) {
724                Some(VectorIndexType::ScaNN) => {
725                    // ScaNN (IVF-PQ) index
726                    let config = dense_config.unwrap();
727                    let centroids_path =
728                        config.coarse_centroids_path.as_ref().ok_or_else(|| {
729                            crate::Error::Schema("ScaNN requires coarse_centroids_path".into())
730                        })?;
731                    let codebook_path = config.pq_codebook_path.as_ref().ok_or_else(|| {
732                        crate::Error::Schema("ScaNN requires pq_codebook_path".into())
733                    })?;
734
735                    let coarse_centroids = crate::structures::CoarseCentroids::load(
736                        std::path::Path::new(centroids_path),
737                    )
738                    .map_err(crate::Error::Io)?;
739
740                    let pq_codebook =
741                        crate::structures::PQCodebook::load(std::path::Path::new(codebook_path))
742                            .map_err(crate::Error::Io)?;
743
744                    let doc_ids: Vec<u32> = builder.doc_ids.clone();
745                    let ivfpq_config = crate::structures::IVFPQConfig::new(index_dim)
746                        .with_store_raw(config.store_raw);
747
748                    let ivfpq_index = crate::structures::IVFPQIndex::build(
749                        ivfpq_config,
750                        &coarse_centroids,
751                        &pq_codebook,
752                        &vectors,
753                        Some(doc_ids.as_slice()),
754                    );
755
756                    // Serialize ScaNN index (IVFPQIndex only - codebook loaded separately)
757                    let bytes = ivfpq_index
758                        .to_bytes()
759                        .map_err(|e| crate::Error::Serialization(e.to_string()))?;
760                    (2u8, bytes) // 2 = ScaNN
761                }
762                Some(VectorIndexType::IvfRaBitQ) => {
763                    // IVF-RaBitQ index
764                    let config = dense_config.unwrap();
765                    let centroids_path =
766                        config.coarse_centroids_path.as_ref().ok_or_else(|| {
767                            crate::Error::Schema("IVF-RaBitQ requires coarse_centroids_path".into())
768                        })?;
769
770                    match crate::structures::CoarseCentroids::load(std::path::Path::new(
771                        centroids_path,
772                    )) {
773                        Ok(coarse_centroids) => {
774                            let ivf_cfg = crate::structures::IVFRaBitQConfig::new(index_dim)
775                                .with_store_raw(config.store_raw);
776                            let rabitq_codebook = crate::structures::RaBitQCodebook::new(
777                                crate::structures::RaBitQConfig::new(index_dim),
778                            );
779                            let doc_ids: Vec<u32> = builder.doc_ids.clone();
780                            let ivf_index = crate::structures::IVFRaBitQIndex::build(
781                                ivf_cfg,
782                                &coarse_centroids,
783                                &rabitq_codebook,
784                                &vectors,
785                                Some(doc_ids.as_slice()),
786                            );
787                            let bytes = ivf_index
788                                .to_bytes()
789                                .map_err(|e| crate::Error::Serialization(e.to_string()))?;
790                            (1u8, bytes) // 1 = IVF-RaBitQ
791                        }
792                        Err(e) => {
793                            log::warn!("Failed to load centroids: {}, falling back to RaBitQ", e);
794                            let cfg = crate::structures::RaBitQConfig::new(index_dim);
795                            let idx = crate::structures::RaBitQIndex::build(cfg, &vectors, true);
796                            let bytes = serde_json::to_vec(&idx)
797                                .map_err(|e| crate::Error::Serialization(e.to_string()))?;
798                            (0u8, bytes) // 0 = RaBitQ
799                        }
800                    }
801                }
802                _ => {
803                    // Default: RaBitQ
804                    let store_raw = dense_config.map(|c| c.store_raw).unwrap_or(true);
805                    let cfg = crate::structures::RaBitQConfig::new(index_dim);
806                    let idx = crate::structures::RaBitQIndex::build(cfg, &vectors, store_raw);
807                    let bytes = serde_json::to_vec(&idx)
808                        .map_err(|e| crate::Error::Serialization(e.to_string()))?;
809                    (0u8, bytes) // 0 = RaBitQ
810                }
811            };
812
813            field_indexes.push((field_id, index_type, index_bytes));
814        }
815
816        if field_indexes.is_empty() {
817            return Ok(Vec::new());
818        }
819
820        // Sort by field_id for consistent ordering
821        field_indexes.sort_by_key(|(id, _, _)| *id);
822
823        // Calculate header size: num_fields + (field_id, index_type, offset, len) per field
824        let header_size = 4 + field_indexes.len() * (4 + 1 + 8 + 8);
825
826        // Build output
827        let mut output = Vec::new();
828
829        // Write number of fields
830        output.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
831
832        // Calculate offsets and write header entries
833        let mut current_offset = header_size as u64;
834        for (field_id, index_type, data) in &field_indexes {
835            output.write_u32::<LittleEndian>(*field_id)?;
836            output.write_u8(*index_type)?;
837            output.write_u64::<LittleEndian>(current_offset)?;
838            output.write_u64::<LittleEndian>(data.len() as u64)?;
839            current_offset += data.len() as u64;
840        }
841
842        // Write data
843        for (_, _, data) in field_indexes {
844            output.extend_from_slice(&data);
845        }
846
847        Ok(output)
848    }
849
850    /// Build postings from inverted index
851    ///
852    /// Uses parallel processing to serialize posting lists concurrently.
853    fn build_postings(&mut self) -> Result<(Vec<u8>, Vec<u8>)> {
854        // Phase 1: Collect and sort term keys (parallel key generation)
855        // Key format: field_id (4 bytes) + term bytes
856        let mut term_entries: Vec<(Vec<u8>, &PostingListBuilder)> = self
857            .inverted_index
858            .iter()
859            .map(|(term_key, posting_list)| {
860                let term_str = self.term_interner.resolve(&term_key.term);
861                let mut key = Vec::with_capacity(4 + term_str.len());
862                key.extend_from_slice(&term_key.field.to_le_bytes());
863                key.extend_from_slice(term_str.as_bytes());
864                (key, posting_list)
865            })
866            .collect();
867
868        // Sort by key for SSTable ordering
869        term_entries.par_sort_unstable_by(|a, b| a.0.cmp(&b.0));
870
871        // Phase 2: Parallel serialization of posting lists
872        // Each term's posting list is serialized independently
873        let serialized: Vec<(Vec<u8>, SerializedPosting)> = term_entries
874            .into_par_iter()
875            .map(|(key, posting_builder)| {
876                // Build posting list from in-memory postings
877                let mut full_postings = PostingList::with_capacity(posting_builder.len());
878                for p in &posting_builder.postings {
879                    full_postings.push(p.doc_id, p.term_freq as u32);
880                }
881
882                // Build term info
883                let doc_ids: Vec<u32> = full_postings.iter().map(|p| p.doc_id).collect();
884                let term_freqs: Vec<u32> = full_postings.iter().map(|p| p.term_freq).collect();
885
886                let result = if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
887                    SerializedPosting::Inline(inline)
888                } else {
889                    // Serialize to local buffer
890                    let mut posting_bytes = Vec::new();
891                    let block_list =
892                        crate::structures::BlockPostingList::from_posting_list(&full_postings)
893                            .expect("BlockPostingList creation failed");
894                    block_list
895                        .serialize(&mut posting_bytes)
896                        .expect("BlockPostingList serialization failed");
897                    SerializedPosting::External {
898                        bytes: posting_bytes,
899                        doc_count: full_postings.doc_count(),
900                    }
901                };
902
903                (key, result)
904            })
905            .collect();
906
907        // Phase 3: Sequential assembly (must be sequential for offset calculation)
908        let mut term_dict = Vec::new();
909        let mut postings = Vec::new();
910        let mut writer = SSTableWriter::<TermInfo>::new(&mut term_dict);
911
912        for (key, serialized_posting) in serialized {
913            let term_info = match serialized_posting {
914                SerializedPosting::Inline(info) => info,
915                SerializedPosting::External { bytes, doc_count } => {
916                    let posting_offset = postings.len() as u64;
917                    let posting_len = bytes.len() as u32;
918                    postings.extend_from_slice(&bytes);
919                    TermInfo::external(posting_offset, posting_len, doc_count)
920                }
921            };
922
923            writer.insert(&key, &term_info)?;
924        }
925
926        writer.finish()?;
927        Ok((term_dict, postings))
928    }
929
930    /// Build document store from streamed temp file (static method for parallel execution)
931    ///
932    /// Uses parallel processing to deserialize documents concurrently.
933    fn build_store_parallel(
934        store_path: &PathBuf,
935        schema: &Schema,
936        num_compression_threads: usize,
937        compression_level: CompressionLevel,
938    ) -> Result<Vec<u8>> {
939        use super::store::EagerParallelStoreWriter;
940
941        let file = File::open(store_path)?;
942        let mmap = unsafe { memmap2::Mmap::map(&file)? };
943
944        // Phase 1: Parse document boundaries (sequential, fast)
945        let mut doc_ranges: Vec<(usize, usize)> = Vec::new();
946        let mut offset = 0usize;
947        while offset + 4 <= mmap.len() {
948            let doc_len = u32::from_le_bytes([
949                mmap[offset],
950                mmap[offset + 1],
951                mmap[offset + 2],
952                mmap[offset + 3],
953            ]) as usize;
954            offset += 4;
955
956            if offset + doc_len > mmap.len() {
957                break;
958            }
959
960            doc_ranges.push((offset, doc_len));
961            offset += doc_len;
962        }
963
964        // Phase 2: Parallel deserialization of documents
965        let docs: Vec<Document> = doc_ranges
966            .into_par_iter()
967            .filter_map(|(start, len)| {
968                let doc_bytes = &mmap[start..start + len];
969                super::store::deserialize_document(doc_bytes, schema).ok()
970            })
971            .collect();
972
973        // Phase 3: Write to store (compression is already parallel in EagerParallelStoreWriter)
974        let mut store_data = Vec::new();
975        let mut store_writer = EagerParallelStoreWriter::with_compression_level(
976            &mut store_data,
977            num_compression_threads,
978            compression_level,
979        );
980
981        for doc in &docs {
982            store_writer.store(doc, schema)?;
983        }
984
985        store_writer.finish()?;
986        Ok(store_data)
987    }
988}
989
990impl Drop for SegmentBuilder {
991    fn drop(&mut self) {
992        // Cleanup temp files on drop
993        let _ = std::fs::remove_file(&self.store_path);
994    }
995}