Skip to main content

hermes_core/segment/builder/
mod.rs

1//! Streaming segment builder with optimized memory usage
2//!
3//! Key optimizations:
4//! - **String interning**: Terms are interned using `lasso` to avoid repeated allocations
5//! - **hashbrown HashMap**: O(1) average insertion instead of BTreeMap's O(log n)
6//! - **Streaming document store**: Documents written to disk immediately
7//! - **Zero-copy store build**: Pre-serialized doc bytes passed directly to compressor
8//! - **Parallel posting serialization**: Rayon parallel sort + serialize
9//! - **Inline posting fast path**: Small terms skip PostingList/BlockPostingList entirely
10
11mod config;
12mod posting;
13mod vectors;
14
15pub use config::{MemoryBreakdown, SegmentBuilderConfig, SegmentBuilderStats};
16
17use std::fs::{File, OpenOptions};
18use std::io::{BufWriter, Write};
19use std::mem::size_of;
20use std::path::PathBuf;
21
22use hashbrown::HashMap;
23use lasso::{Rodeo, Spur};
24use rayon::prelude::*;
25use rustc_hash::FxHashMap;
26
27use crate::compression::CompressionLevel;
28
29use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
30use crate::directories::{Directory, DirectoryWriter};
31use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
32use crate::structures::{PostingList, SSTableWriter, TermInfo};
33use crate::tokenizer::BoxedTokenizer;
34use crate::{DocId, Result};
35
36use posting::{
37    CompactPosting, PositionPostingListBuilder, PostingListBuilder, SerializedPosting, TermKey,
38};
39use vectors::{DenseVectorBuilder, SparseVectorBuilder};
40
41use super::vector_data::FlatVectorData;
42
43/// Size of the document store buffer before writing to disk
44const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; // 16MB
45
46/// Segment builder with optimized memory usage
47///
48/// Features:
49/// - Streams documents to disk immediately (no in-memory document storage)
50/// - Uses string interning for terms (reduced allocations)
51/// - Uses hashbrown HashMap (faster than BTreeMap)
52pub struct SegmentBuilder {
53    schema: Schema,
54    config: SegmentBuilderConfig,
55    tokenizers: FxHashMap<Field, BoxedTokenizer>,
56
57    /// String interner for terms - O(1) lookup and deduplication
58    term_interner: Rodeo,
59
60    /// Inverted index: term key -> posting list
61    inverted_index: HashMap<TermKey, PostingListBuilder>,
62
63    /// Streaming document store writer
64    store_file: BufWriter<File>,
65    store_path: PathBuf,
66
67    /// Document count
68    next_doc_id: DocId,
69
70    /// Per-field statistics for BM25F
71    field_stats: FxHashMap<u32, FieldStats>,
72
73    /// Per-document field lengths stored compactly
74    /// Uses a flat Vec instead of Vec<HashMap> for better cache locality
75    /// Layout: [doc0_field0_len, doc0_field1_len, ..., doc1_field0_len, ...]
76    doc_field_lengths: Vec<u32>,
77    num_indexed_fields: usize,
78    field_to_slot: FxHashMap<u32, usize>,
79
80    /// Reusable buffer for per-document term frequency aggregation
81    /// Avoids allocating a new hashmap for each document
82    local_tf_buffer: FxHashMap<Spur, u32>,
83
84    /// Reusable buffer for per-document position tracking (when positions enabled)
85    /// Avoids allocating a new hashmap for each text field per document
86    local_positions: FxHashMap<Spur, Vec<u32>>,
87
88    /// Reusable buffer for tokenization to avoid per-token String allocations
89    token_buffer: String,
90
91    /// Dense vector storage per field: field -> (doc_ids, vectors)
92    /// Vectors are stored as flat f32 arrays for efficient RaBitQ indexing
93    dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
94
95    /// Sparse vector storage per field: field -> SparseVectorBuilder
96    /// Uses proper BlockSparsePostingList with configurable quantization
97    sparse_vectors: FxHashMap<u32, SparseVectorBuilder>,
98
99    /// Position index for fields with positions enabled
100    /// term key -> position posting list
101    position_index: HashMap<TermKey, PositionPostingListBuilder>,
102
103    /// Fields that have position tracking enabled, with their mode
104    position_enabled_fields: FxHashMap<u32, Option<crate::dsl::PositionMode>>,
105
106    /// Current element ordinal for multi-valued fields (reset per document)
107    current_element_ordinal: FxHashMap<u32, u32>,
108
109    /// Incrementally tracked memory estimate (avoids expensive stats() calls)
110    estimated_memory: usize,
111}
112
113impl SegmentBuilder {
114    /// Create a new segment builder
115    pub fn new(schema: Schema, config: SegmentBuilderConfig) -> Result<Self> {
116        let segment_id = uuid::Uuid::new_v4();
117        let store_path = config
118            .temp_dir
119            .join(format!("hermes_store_{}.tmp", segment_id));
120
121        let store_file = BufWriter::with_capacity(
122            STORE_BUFFER_SIZE,
123            OpenOptions::new()
124                .create(true)
125                .write(true)
126                .truncate(true)
127                .open(&store_path)?,
128        );
129
130        // Count indexed fields for compact field length storage
131        // Also track which fields have position recording enabled
132        let mut num_indexed_fields = 0;
133        let mut field_to_slot = FxHashMap::default();
134        let mut position_enabled_fields = FxHashMap::default();
135        for (field, entry) in schema.fields() {
136            if entry.indexed && matches!(entry.field_type, FieldType::Text) {
137                field_to_slot.insert(field.0, num_indexed_fields);
138                num_indexed_fields += 1;
139                if entry.positions.is_some() {
140                    position_enabled_fields.insert(field.0, entry.positions);
141                }
142            }
143        }
144
145        Ok(Self {
146            schema,
147            tokenizers: FxHashMap::default(),
148            term_interner: Rodeo::new(),
149            inverted_index: HashMap::with_capacity(config.posting_map_capacity),
150            store_file,
151            store_path,
152            next_doc_id: 0,
153            field_stats: FxHashMap::default(),
154            doc_field_lengths: Vec::new(),
155            num_indexed_fields,
156            field_to_slot,
157            local_tf_buffer: FxHashMap::default(),
158            local_positions: FxHashMap::default(),
159            token_buffer: String::with_capacity(64),
160            config,
161            dense_vectors: FxHashMap::default(),
162            sparse_vectors: FxHashMap::default(),
163            position_index: HashMap::new(),
164            position_enabled_fields,
165            current_element_ordinal: FxHashMap::default(),
166            estimated_memory: 0,
167        })
168    }
169
170    pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
171        self.tokenizers.insert(field, tokenizer);
172    }
173
174    pub fn num_docs(&self) -> u32 {
175        self.next_doc_id
176    }
177
178    /// Fast O(1) memory estimate - updated incrementally during indexing
179    #[inline]
180    pub fn estimated_memory_bytes(&self) -> usize {
181        self.estimated_memory
182    }
183
184    /// Recalibrate incremental memory estimate using capacity-based calculation.
185    /// More expensive than estimated_memory_bytes() — O(terms + dims) vs O(1) —
186    /// but accounts for Vec capacity growth (doubling) and HashMap table overhead.
187    /// Call periodically (e.g. every 1000 docs) to prevent drift.
188    pub fn recalibrate_memory(&mut self) {
189        self.estimated_memory = self.stats().estimated_memory_bytes;
190    }
191
192    /// Count total unique sparse dimensions across all fields
193    pub fn sparse_dim_count(&self) -> usize {
194        self.sparse_vectors.values().map(|b| b.postings.len()).sum()
195    }
196
197    /// Get current statistics for debugging performance (expensive - iterates all data)
198    pub fn stats(&self) -> SegmentBuilderStats {
199        use std::mem::size_of;
200
201        let postings_in_memory: usize =
202            self.inverted_index.values().map(|p| p.postings.len()).sum();
203
204        // Size constants computed from actual types
205        let compact_posting_size = size_of::<CompactPosting>();
206        let vec_overhead = size_of::<Vec<u8>>(); // Vec header: ptr + len + cap = 24 bytes on 64-bit
207        let term_key_size = size_of::<TermKey>();
208        let posting_builder_size = size_of::<PostingListBuilder>();
209        let spur_size = size_of::<lasso::Spur>();
210        let sparse_entry_size = size_of::<(DocId, u16, f32)>();
211
212        // hashbrown HashMap entry overhead: key + value + 1 byte control + padding
213        // Measured: ~(key_size + value_size + 8) per entry on average
214        let hashmap_entry_base_overhead = 8usize;
215
216        // FxHashMap uses same layout as hashbrown
217        let fxhashmap_entry_overhead = hashmap_entry_base_overhead;
218
219        // Postings memory
220        let postings_bytes: usize = self
221            .inverted_index
222            .values()
223            .map(|p| p.postings.capacity() * compact_posting_size + vec_overhead)
224            .sum();
225
226        // Inverted index overhead
227        let index_overhead_bytes = self.inverted_index.len()
228            * (term_key_size + posting_builder_size + hashmap_entry_base_overhead);
229
230        // Term interner: Rodeo stores strings + metadata
231        // Rodeo internal: string bytes + Spur + arena overhead (~2 pointers per string)
232        let interner_arena_overhead = 2 * size_of::<usize>();
233        let avg_term_len = 8; // Estimated average term length
234        let interner_bytes =
235            self.term_interner.len() * (avg_term_len + spur_size + interner_arena_overhead);
236
237        // Doc field lengths
238        let field_lengths_bytes =
239            self.doc_field_lengths.capacity() * size_of::<u32>() + vec_overhead;
240
241        // Dense vectors
242        let mut dense_vectors_bytes: usize = 0;
243        let mut dense_vector_count: usize = 0;
244        let doc_id_ordinal_size = size_of::<(DocId, u16)>();
245        for b in self.dense_vectors.values() {
246            dense_vectors_bytes += b.vectors.capacity() * size_of::<f32>()
247                + b.doc_ids.capacity() * doc_id_ordinal_size
248                + 2 * vec_overhead; // Two Vecs
249            dense_vector_count += b.doc_ids.len();
250        }
251
252        // Local buffers
253        let local_tf_entry_size = spur_size + size_of::<u32>() + fxhashmap_entry_overhead;
254        let local_tf_buffer_bytes = self.local_tf_buffer.capacity() * local_tf_entry_size;
255
256        // Sparse vectors
257        let mut sparse_vectors_bytes: usize = 0;
258        for builder in self.sparse_vectors.values() {
259            for postings in builder.postings.values() {
260                sparse_vectors_bytes += postings.capacity() * sparse_entry_size + vec_overhead;
261            }
262            // Inner FxHashMap overhead: u32 key + Vec value ptr + overhead
263            let inner_entry_size = size_of::<u32>() + vec_overhead + fxhashmap_entry_overhead;
264            sparse_vectors_bytes += builder.postings.len() * inner_entry_size;
265        }
266        // Outer FxHashMap overhead
267        let outer_sparse_entry_size =
268            size_of::<u32>() + size_of::<SparseVectorBuilder>() + fxhashmap_entry_overhead;
269        sparse_vectors_bytes += self.sparse_vectors.len() * outer_sparse_entry_size;
270
271        // Position index
272        let mut position_index_bytes: usize = 0;
273        for pos_builder in self.position_index.values() {
274            for (_, positions) in &pos_builder.postings {
275                position_index_bytes += positions.capacity() * size_of::<u32>() + vec_overhead;
276            }
277            // Vec<(DocId, Vec<u32>)> entry size
278            let pos_entry_size = size_of::<DocId>() + vec_overhead;
279            position_index_bytes += pos_builder.postings.capacity() * pos_entry_size;
280        }
281        // HashMap overhead for position_index
282        let pos_index_entry_size =
283            term_key_size + size_of::<PositionPostingListBuilder>() + hashmap_entry_base_overhead;
284        position_index_bytes += self.position_index.len() * pos_index_entry_size;
285
286        let estimated_memory_bytes = postings_bytes
287            + index_overhead_bytes
288            + interner_bytes
289            + field_lengths_bytes
290            + dense_vectors_bytes
291            + local_tf_buffer_bytes
292            + sparse_vectors_bytes
293            + position_index_bytes;
294
295        let memory_breakdown = MemoryBreakdown {
296            postings_bytes,
297            index_overhead_bytes,
298            interner_bytes,
299            field_lengths_bytes,
300            dense_vectors_bytes,
301            dense_vector_count,
302            sparse_vectors_bytes,
303            position_index_bytes,
304        };
305
306        SegmentBuilderStats {
307            num_docs: self.next_doc_id,
308            unique_terms: self.inverted_index.len(),
309            postings_in_memory,
310            interned_strings: self.term_interner.len(),
311            doc_field_lengths_size: self.doc_field_lengths.len(),
312            estimated_memory_bytes,
313            memory_breakdown,
314        }
315    }
316
317    /// Add a document - streams to disk immediately
318    pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
319        let doc_id = self.next_doc_id;
320        self.next_doc_id += 1;
321
322        // Initialize field lengths for this document
323        let base_idx = self.doc_field_lengths.len();
324        self.doc_field_lengths
325            .resize(base_idx + self.num_indexed_fields, 0);
326        self.estimated_memory += self.num_indexed_fields * std::mem::size_of::<u32>();
327
328        // Reset element ordinals for this document (for multi-valued fields)
329        self.current_element_ordinal.clear();
330
331        for (field, value) in doc.field_values() {
332            let entry = self.schema.get_field_entry(*field);
333            if entry.is_none() {
334                continue;
335            }
336
337            let entry = entry.unwrap();
338            // Dense vectors are written to .vectors when indexed || stored
339            // Other field types require indexed
340            let dominated_by_index = matches!(&entry.field_type, FieldType::DenseVector);
341            if !dominated_by_index && !entry.indexed {
342                continue;
343            }
344
345            match (&entry.field_type, value) {
346                (FieldType::Text, FieldValue::Text(text)) => {
347                    // Get current element ordinal for multi-valued fields
348                    let element_ordinal = *self.current_element_ordinal.get(&field.0).unwrap_or(&0);
349                    let token_count =
350                        self.index_text_field(*field, doc_id, text, element_ordinal)?;
351                    // Increment element ordinal for next value of this field
352                    *self.current_element_ordinal.entry(field.0).or_insert(0) += 1;
353
354                    // Update field statistics
355                    let stats = self.field_stats.entry(field.0).or_default();
356                    stats.total_tokens += token_count as u64;
357                    // Only count each document once, even for multi-value fields
358                    if element_ordinal == 0 {
359                        stats.doc_count += 1;
360                    }
361
362                    // Store field length compactly
363                    if let Some(&slot) = self.field_to_slot.get(&field.0) {
364                        self.doc_field_lengths[base_idx + slot] = token_count;
365                    }
366                }
367                (FieldType::U64, FieldValue::U64(v)) => {
368                    self.index_numeric_field(*field, doc_id, *v)?;
369                }
370                (FieldType::I64, FieldValue::I64(v)) => {
371                    self.index_numeric_field(*field, doc_id, *v as u64)?;
372                }
373                (FieldType::F64, FieldValue::F64(v)) => {
374                    self.index_numeric_field(*field, doc_id, v.to_bits())?;
375                }
376                (FieldType::DenseVector, FieldValue::DenseVector(vec))
377                    if entry.indexed || entry.stored =>
378                {
379                    // Dense vectors written to .vectors (not .store) when indexed || stored
380                    let element_ordinal = *self.current_element_ordinal.get(&field.0).unwrap_or(&0);
381                    self.index_dense_vector_field(*field, doc_id, element_ordinal as u16, vec)?;
382                    // Increment element ordinal for next value of this field
383                    *self.current_element_ordinal.entry(field.0).or_insert(0) += 1;
384                }
385                (FieldType::SparseVector, FieldValue::SparseVector(entries)) => {
386                    // Get current element ordinal for multi-valued fields
387                    let element_ordinal = *self.current_element_ordinal.get(&field.0).unwrap_or(&0);
388                    self.index_sparse_vector_field(
389                        *field,
390                        doc_id,
391                        element_ordinal as u16,
392                        entries,
393                    )?;
394                    // Increment element ordinal for next value of this field
395                    *self.current_element_ordinal.entry(field.0).or_insert(0) += 1;
396                }
397                _ => {}
398            }
399        }
400
401        // Stream document to disk immediately
402        self.write_document_to_store(&doc)?;
403
404        Ok(doc_id)
405    }
406
407    /// Index a text field using interned terms
408    ///
409    /// Optimization: Zero-allocation inline tokenization + term frequency aggregation.
410    /// Instead of allocating a String per token, we:
411    /// 1. Iterate over whitespace-split words
412    /// 2. Build lowercase token in a reusable buffer
413    /// 3. Intern directly from the buffer
414    ///
415    /// If position recording is enabled for this field, also records token positions
416    /// encoded as (element_ordinal << 20) | token_position.
417    fn index_text_field(
418        &mut self,
419        field: Field,
420        doc_id: DocId,
421        text: &str,
422        element_ordinal: u32,
423    ) -> Result<u32> {
424        use crate::dsl::PositionMode;
425
426        let field_id = field.0;
427        let position_mode = self
428            .position_enabled_fields
429            .get(&field_id)
430            .copied()
431            .flatten();
432
433        // Phase 1: Aggregate term frequencies within this document
434        // Also collect positions if enabled
435        // Reuse buffers to avoid allocations
436        self.local_tf_buffer.clear();
437        // Clear position Vecs in-place (keeps allocated capacity for reuse)
438        for v in self.local_positions.values_mut() {
439            v.clear();
440        }
441
442        let mut token_position = 0u32;
443
444        // Zero-allocation tokenization: iterate words, lowercase inline, intern directly
445        for word in text.split_whitespace() {
446            // Build lowercase token in reusable buffer
447            self.token_buffer.clear();
448            for c in word.chars() {
449                if c.is_alphanumeric() {
450                    for lc in c.to_lowercase() {
451                        self.token_buffer.push(lc);
452                    }
453                }
454            }
455
456            if self.token_buffer.is_empty() {
457                continue;
458            }
459
460            // Intern the term directly from buffer - O(1) amortized
461            let is_new_string = !self.term_interner.contains(&self.token_buffer);
462            let term_spur = self.term_interner.get_or_intern(&self.token_buffer);
463            if is_new_string {
464                use std::mem::size_of;
465                // string bytes + Spur + arena overhead (2 pointers)
466                self.estimated_memory +=
467                    self.token_buffer.len() + size_of::<Spur>() + 2 * size_of::<usize>();
468            }
469            *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
470
471            // Record position based on mode
472            if let Some(mode) = position_mode {
473                let encoded_pos = match mode {
474                    // Ordinal only: just store element ordinal (token position = 0)
475                    PositionMode::Ordinal => element_ordinal << 20,
476                    // Token position only: just store token position (ordinal = 0)
477                    PositionMode::TokenPosition => token_position,
478                    // Full: encode both
479                    PositionMode::Full => (element_ordinal << 20) | token_position,
480                };
481                self.local_positions
482                    .entry(term_spur)
483                    .or_default()
484                    .push(encoded_pos);
485            }
486
487            token_position += 1;
488        }
489
490        // Phase 2: Insert aggregated terms into inverted index
491        // Now we only do one inverted_index lookup per unique term in doc
492        for (&term_spur, &tf) in &self.local_tf_buffer {
493            let term_key = TermKey {
494                field: field_id,
495                term: term_spur,
496            };
497
498            let is_new_term = !self.inverted_index.contains_key(&term_key);
499            let posting = self
500                .inverted_index
501                .entry(term_key)
502                .or_insert_with(PostingListBuilder::new);
503            posting.add(doc_id, tf);
504
505            // Incremental memory tracking
506            use std::mem::size_of;
507            self.estimated_memory += size_of::<CompactPosting>();
508            if is_new_term {
509                // HashMap entry overhead + PostingListBuilder + Vec header
510                self.estimated_memory +=
511                    size_of::<TermKey>() + size_of::<PostingListBuilder>() + 24;
512            }
513
514            // Add positions if enabled
515            if position_mode.is_some()
516                && let Some(positions) = self.local_positions.get(&term_spur)
517            {
518                let is_new_pos_term = !self.position_index.contains_key(&term_key);
519                let pos_posting = self
520                    .position_index
521                    .entry(term_key)
522                    .or_insert_with(PositionPostingListBuilder::new);
523                for &pos in positions {
524                    pos_posting.add_position(doc_id, pos);
525                }
526                // Incremental memory tracking for position index
527                self.estimated_memory += positions.len() * size_of::<u32>();
528                if is_new_pos_term {
529                    self.estimated_memory +=
530                        size_of::<TermKey>() + size_of::<PositionPostingListBuilder>() + 24;
531                }
532            }
533        }
534
535        Ok(token_position)
536    }
537
538    fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
539        use std::mem::size_of;
540
541        // For numeric fields, we use a special encoding
542        let term_str = format!("__num_{}", value);
543        let is_new_string = !self.term_interner.contains(&term_str);
544        let term_spur = self.term_interner.get_or_intern(&term_str);
545
546        let term_key = TermKey {
547            field: field.0,
548            term: term_spur,
549        };
550
551        let is_new_term = !self.inverted_index.contains_key(&term_key);
552        let posting = self
553            .inverted_index
554            .entry(term_key)
555            .or_insert_with(PostingListBuilder::new);
556        posting.add(doc_id, 1);
557
558        // Incremental memory tracking
559        self.estimated_memory += size_of::<CompactPosting>();
560        if is_new_term {
561            self.estimated_memory += size_of::<TermKey>() + size_of::<PostingListBuilder>() + 24;
562        }
563        if is_new_string {
564            self.estimated_memory += term_str.len() + size_of::<Spur>() + 2 * size_of::<usize>();
565        }
566
567        Ok(())
568    }
569
570    /// Index a dense vector field with ordinal tracking
571    fn index_dense_vector_field(
572        &mut self,
573        field: Field,
574        doc_id: DocId,
575        ordinal: u16,
576        vector: &[f32],
577    ) -> Result<()> {
578        let dim = vector.len();
579
580        let builder = self
581            .dense_vectors
582            .entry(field.0)
583            .or_insert_with(|| DenseVectorBuilder::new(dim));
584
585        // Verify dimension consistency
586        if builder.dim != dim && builder.len() > 0 {
587            return Err(crate::Error::Schema(format!(
588                "Dense vector dimension mismatch: expected {}, got {}",
589                builder.dim, dim
590            )));
591        }
592
593        builder.add(doc_id, ordinal, vector);
594
595        // Incremental memory tracking
596        use std::mem::{size_of, size_of_val};
597        self.estimated_memory += size_of_val(vector) + size_of::<(DocId, u16)>();
598
599        Ok(())
600    }
601
602    /// Index a sparse vector field using dedicated sparse posting lists
603    ///
604    /// Collects (doc_id, ordinal, weight) postings per dimension. During commit, these are
605    /// converted to BlockSparsePostingList with proper quantization from SparseVectorConfig.
606    ///
607    /// Weights below the configured `weight_threshold` are not indexed.
608    fn index_sparse_vector_field(
609        &mut self,
610        field: Field,
611        doc_id: DocId,
612        ordinal: u16,
613        entries: &[(u32, f32)],
614    ) -> Result<()> {
615        // Get weight threshold from field config (default 0.0 = no filtering)
616        let weight_threshold = self
617            .schema
618            .get_field_entry(field)
619            .and_then(|entry| entry.sparse_vector_config.as_ref())
620            .map(|config| config.weight_threshold)
621            .unwrap_or(0.0);
622
623        let builder = self
624            .sparse_vectors
625            .entry(field.0)
626            .or_insert_with(SparseVectorBuilder::new);
627
628        for &(dim_id, weight) in entries {
629            // Skip weights below threshold
630            if weight.abs() < weight_threshold {
631                continue;
632            }
633
634            // Incremental memory tracking
635            use std::mem::size_of;
636            let is_new_dim = !builder.postings.contains_key(&dim_id);
637            builder.add(dim_id, doc_id, ordinal, weight);
638            self.estimated_memory += size_of::<(DocId, u16, f32)>();
639            if is_new_dim {
640                // HashMap entry overhead + Vec header
641                self.estimated_memory += size_of::<u32>() + size_of::<Vec<(DocId, u16, f32)>>() + 8; // 8 = hashmap control byte + padding
642            }
643        }
644
645        Ok(())
646    }
647
648    /// Write document to streaming store
649    fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
650        use byteorder::{LittleEndian, WriteBytesExt};
651
652        let doc_bytes = super::store::serialize_document(doc, &self.schema)?;
653
654        self.store_file
655            .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
656        self.store_file.write_all(&doc_bytes)?;
657
658        Ok(())
659    }
660
661    /// Build the final segment
662    ///
663    /// Streams all data directly to disk via StreamingWriter to avoid buffering
664    /// entire serialized outputs in memory. Each phase consumes and drops its
665    /// source data before the next phase begins.
666    pub async fn build<D: Directory + DirectoryWriter>(
667        mut self,
668        dir: &D,
669        segment_id: SegmentId,
670        trained: Option<&super::TrainedVectorStructures>,
671    ) -> Result<SegmentMeta> {
672        // Flush any buffered data
673        self.store_file.flush()?;
674
675        let files = SegmentFiles::new(segment_id.0);
676
677        // Phase 1: Stream positions directly to disk (consumes position_index)
678        let position_index = std::mem::take(&mut self.position_index);
679        let position_offsets = if !position_index.is_empty() {
680            let mut pos_writer = dir.streaming_writer(&files.positions).await?;
681            let offsets = Self::build_positions_streaming(
682                position_index,
683                &self.term_interner,
684                &mut *pos_writer,
685            )?;
686            pos_writer.finish()?;
687            offsets
688        } else {
689            FxHashMap::default()
690        };
691
692        // Phase 2: 4-way parallel build — postings, store, dense vectors, sparse vectors
693        // These are fully independent: different source data, different output files.
694        let inverted_index = std::mem::take(&mut self.inverted_index);
695        let term_interner = std::mem::replace(&mut self.term_interner, Rodeo::new());
696        let store_path = self.store_path.clone();
697        let num_compression_threads = self.config.num_compression_threads;
698        let compression_level = self.config.compression_level;
699        let dense_vectors = std::mem::take(&mut self.dense_vectors);
700        let mut sparse_vectors = std::mem::take(&mut self.sparse_vectors);
701        let schema = &self.schema;
702
703        // Pre-create all streaming writers (async) before entering sync rayon scope
704        let mut term_dict_writer = dir.streaming_writer(&files.term_dict).await?;
705        let mut postings_writer = dir.streaming_writer(&files.postings).await?;
706        let mut store_writer = dir.streaming_writer(&files.store).await?;
707        let mut vectors_writer = if !dense_vectors.is_empty() {
708            Some(dir.streaming_writer(&files.vectors).await?)
709        } else {
710            None
711        };
712        let mut sparse_writer = if !sparse_vectors.is_empty() {
713            Some(dir.streaming_writer(&files.sparse).await?)
714        } else {
715            None
716        };
717
718        let ((postings_result, store_result), (vectors_result, sparse_result)) = rayon::join(
719            || {
720                rayon::join(
721                    || {
722                        Self::build_postings_streaming(
723                            inverted_index,
724                            term_interner,
725                            &position_offsets,
726                            &mut *term_dict_writer,
727                            &mut *postings_writer,
728                        )
729                    },
730                    || {
731                        Self::build_store_streaming(
732                            &store_path,
733                            num_compression_threads,
734                            compression_level,
735                            &mut *store_writer,
736                        )
737                    },
738                )
739            },
740            || {
741                rayon::join(
742                    || -> Result<()> {
743                        if let Some(ref mut w) = vectors_writer {
744                            Self::build_vectors_streaming(
745                                dense_vectors,
746                                schema,
747                                trained,
748                                &mut **w,
749                            )?;
750                        }
751                        Ok(())
752                    },
753                    || -> Result<()> {
754                        if let Some(ref mut w) = sparse_writer {
755                            Self::build_sparse_streaming(&mut sparse_vectors, schema, &mut **w)?;
756                        }
757                        Ok(())
758                    },
759                )
760            },
761        );
762        postings_result?;
763        store_result?;
764        vectors_result?;
765        sparse_result?;
766        term_dict_writer.finish()?;
767        postings_writer.finish()?;
768        store_writer.finish()?;
769        if let Some(w) = vectors_writer {
770            w.finish()?;
771        }
772        if let Some(w) = sparse_writer {
773            w.finish()?;
774        }
775        drop(position_offsets);
776        drop(sparse_vectors);
777
778        let meta = SegmentMeta {
779            id: segment_id.0,
780            num_docs: self.next_doc_id,
781            field_stats: self.field_stats.clone(),
782        };
783
784        dir.write(&files.meta, &meta.serialize()?).await?;
785
786        // Cleanup temp files
787        let _ = std::fs::remove_file(&self.store_path);
788
789        Ok(meta)
790    }
791
792    /// Stream dense vectors directly to disk (zero-buffer for vector data).
793    ///
794    /// Computes sizes deterministically (no trial serialization needed), writes
795    /// a small header, then streams each field's raw f32 data directly to the writer.
796    fn build_vectors_streaming(
797        dense_vectors: FxHashMap<u32, DenseVectorBuilder>,
798        schema: &Schema,
799        trained: Option<&super::TrainedVectorStructures>,
800        writer: &mut dyn Write,
801    ) -> Result<()> {
802        use crate::dsl::{DenseVectorQuantization, VectorIndexType};
803
804        let mut fields: Vec<(u32, DenseVectorBuilder)> = dense_vectors
805            .into_iter()
806            .filter(|(_, b)| b.len() > 0)
807            .collect();
808        fields.sort_by_key(|(id, _)| *id);
809
810        if fields.is_empty() {
811            return Ok(());
812        }
813
814        // Resolve quantization config per field from schema
815        let quants: Vec<DenseVectorQuantization> = fields
816            .iter()
817            .map(|(field_id, _)| {
818                schema
819                    .get_field_entry(Field(*field_id))
820                    .and_then(|e| e.dense_vector_config.as_ref())
821                    .map(|c| c.quantization)
822                    .unwrap_or(DenseVectorQuantization::F32)
823            })
824            .collect();
825
826        // Compute sizes using deterministic formula (no serialization needed)
827        let mut field_sizes: Vec<usize> = Vec::with_capacity(fields.len());
828        for (i, (_field_id, builder)) in fields.iter().enumerate() {
829            field_sizes.push(FlatVectorData::serialized_binary_size(
830                builder.dim,
831                builder.len(),
832                quants[i],
833            ));
834        }
835
836        use crate::segment::format::{DenseVectorTocEntry, write_dense_toc_and_footer};
837
838        // Data-first format: stream field data, then write TOC + footer at end.
839        // Data starts at file offset 0 → mmap page-aligned, no alignment copies.
840        let mut toc: Vec<DenseVectorTocEntry> = Vec::with_capacity(fields.len() * 2);
841        let mut current_offset = 0u64;
842
843        // Pre-build ANN indexes in parallel across fields.
844        // Each field's ANN build is independent (different vectors, different centroids).
845        let ann_blobs: Vec<(u32, u8, Vec<u8>)> = if let Some(trained) = trained {
846            fields
847                .par_iter()
848                .filter_map(|(field_id, builder)| {
849                    let config = schema
850                        .get_field_entry(Field(*field_id))
851                        .and_then(|e| e.dense_vector_config.as_ref())?;
852
853                    let dim = builder.dim;
854                    let blob = match config.index_type {
855                        VectorIndexType::IvfRaBitQ if trained.centroids.contains_key(field_id) => {
856                            let centroids = &trained.centroids[field_id];
857                            let (mut index, codebook) =
858                                super::ann_build::new_ivf_rabitq(dim, centroids);
859                            for (i, (doc_id, ordinal)) in builder.doc_ids.iter().enumerate() {
860                                let v = &builder.vectors[i * dim..(i + 1) * dim];
861                                index.add_vector(centroids, &codebook, *doc_id, *ordinal, v);
862                            }
863                            super::ann_build::serialize_ivf_rabitq(index, codebook)
864                                .map(|b| (super::ann_build::IVF_RABITQ_TYPE, b))
865                        }
866                        VectorIndexType::ScaNN
867                            if trained.centroids.contains_key(field_id)
868                                && trained.codebooks.contains_key(field_id) =>
869                        {
870                            let centroids = &trained.centroids[field_id];
871                            let codebook = &trained.codebooks[field_id];
872                            let mut index = super::ann_build::new_scann(dim, centroids, codebook);
873                            for (i, (doc_id, ordinal)) in builder.doc_ids.iter().enumerate() {
874                                let v = &builder.vectors[i * dim..(i + 1) * dim];
875                                index.add_vector(centroids, codebook, *doc_id, *ordinal, v);
876                            }
877                            super::ann_build::serialize_scann(index, codebook)
878                                .map(|b| (super::ann_build::SCANN_TYPE, b))
879                        }
880                        _ => return None,
881                    };
882                    match blob {
883                        Ok((index_type, bytes)) => {
884                            log::info!(
885                                "[segment_build] built ANN(type={}) for field {} ({} vectors, {} bytes)",
886                                index_type,
887                                field_id,
888                                builder.doc_ids.len(),
889                                bytes.len()
890                            );
891                            Some((*field_id, index_type, bytes))
892                        }
893                        Err(e) => {
894                            log::warn!(
895                                "[segment_build] ANN serialize failed for field {}: {}",
896                                field_id,
897                                e
898                            );
899                            None
900                        }
901                    }
902                })
903                .collect()
904        } else {
905            Vec::new()
906        };
907
908        // Stream each field's flat data directly (builder → disk, no intermediate buffer)
909        for (i, (_field_id, builder)) in fields.into_iter().enumerate() {
910            let data_offset = current_offset;
911            FlatVectorData::serialize_binary_from_flat_streaming(
912                builder.dim,
913                &builder.vectors,
914                &builder.doc_ids,
915                quants[i],
916                writer,
917            )
918            .map_err(crate::Error::Io)?;
919            current_offset += field_sizes[i] as u64;
920            toc.push(DenseVectorTocEntry {
921                field_id: _field_id,
922                index_type: super::ann_build::FLAT_TYPE,
923                offset: data_offset,
924                size: field_sizes[i] as u64,
925            });
926            // Pad to 8-byte boundary so next field's mmap slice is aligned
927            let pad = (8 - (current_offset % 8)) % 8;
928            if pad > 0 {
929                writer.write_all(&[0u8; 8][..pad as usize])?;
930                current_offset += pad;
931            }
932            // builder dropped here, freeing vector memory before next field
933        }
934
935        // Write ANN blob entries after flat entries
936        for (field_id, index_type, blob) in ann_blobs {
937            let data_offset = current_offset;
938            let blob_len = blob.len() as u64;
939            writer.write_all(&blob)?;
940            current_offset += blob_len;
941            toc.push(DenseVectorTocEntry {
942                field_id,
943                index_type,
944                offset: data_offset,
945                size: blob_len,
946            });
947            let pad = (8 - (current_offset % 8)) % 8;
948            if pad > 0 {
949                writer.write_all(&[0u8; 8][..pad as usize])?;
950                current_offset += pad;
951            }
952        }
953
954        // Write TOC + footer
955        write_dense_toc_and_footer(writer, current_offset, &toc)?;
956
957        Ok(())
958    }
959
960    /// Stream sparse vectors directly to disk (footer-based format).
961    ///
962    /// Data is written first (one dim at a time), then the TOC and footer
963    /// are appended. This matches the dense vectors format pattern.
964    fn build_sparse_streaming(
965        sparse_vectors: &mut FxHashMap<u32, SparseVectorBuilder>,
966        schema: &Schema,
967        writer: &mut dyn Write,
968    ) -> Result<()> {
969        use crate::segment::format::{SparseFieldToc, write_sparse_toc_and_footer};
970        use crate::structures::{BlockSparsePostingList, WeightQuantization};
971
972        if sparse_vectors.is_empty() {
973            return Ok(());
974        }
975
976        // Collect and sort fields
977        let mut field_ids: Vec<u32> = sparse_vectors.keys().copied().collect();
978        field_ids.sort_unstable();
979
980        let mut field_tocs: Vec<SparseFieldToc> = Vec::new();
981        let mut current_offset = 0u64;
982
983        for &field_id in &field_ids {
984            let builder = sparse_vectors.get_mut(&field_id).unwrap();
985            if builder.is_empty() {
986                continue;
987            }
988
989            let field = crate::dsl::Field(field_id);
990            let sparse_config = schema
991                .get_field_entry(field)
992                .and_then(|e| e.sparse_vector_config.as_ref());
993
994            let quantization = sparse_config
995                .map(|c| c.weight_quantization)
996                .unwrap_or(WeightQuantization::Float32);
997
998            let block_size = sparse_config.map(|c| c.block_size).unwrap_or(128);
999            let pruning_fraction = sparse_config.and_then(|c| c.posting_list_pruning);
1000
1001            // Parallel: sort + prune + serialize each dimension independently,
1002            // then write sequentially. Each dimension's pipeline is CPU-bound
1003            // and fully independent.
1004            let mut dims: Vec<_> = std::mem::take(&mut builder.postings).into_iter().collect();
1005            dims.sort_unstable_by_key(|(id, _)| *id);
1006
1007            let serialized_dims: Vec<(u32, Vec<u8>)> = dims
1008                .into_par_iter()
1009                .map(|(dim_id, mut postings)| {
1010                    postings.sort_unstable_by_key(|(doc_id, ordinal, _)| (*doc_id, *ordinal));
1011
1012                    if let Some(fraction) = pruning_fraction
1013                        && postings.len() > 1
1014                        && fraction < 1.0
1015                    {
1016                        let original_len = postings.len();
1017                        postings.sort_by(|a, b| {
1018                            b.2.abs()
1019                                .partial_cmp(&a.2.abs())
1020                                .unwrap_or(std::cmp::Ordering::Equal)
1021                        });
1022                        let keep = ((original_len as f64 * fraction as f64).ceil() as usize).max(1);
1023                        postings.truncate(keep);
1024                        postings.sort_unstable_by_key(|(d, o, _)| (*d, *o));
1025                    }
1026
1027                    let block_list = BlockSparsePostingList::from_postings_with_block_size(
1028                        &postings,
1029                        quantization,
1030                        block_size,
1031                    )
1032                    .map_err(crate::Error::Io)?;
1033
1034                    let mut buf = Vec::new();
1035                    block_list.serialize(&mut buf).map_err(crate::Error::Io)?;
1036                    Ok((dim_id, buf))
1037                })
1038                .collect::<Result<Vec<_>>>()?;
1039
1040            // Sequential write (preserves deterministic offset tracking)
1041            let mut dim_entries: Vec<(u32, u64, u32)> = Vec::with_capacity(serialized_dims.len());
1042            for (dim_id, buf) in &serialized_dims {
1043                writer.write_all(buf)?;
1044                dim_entries.push((*dim_id, current_offset, buf.len() as u32));
1045                current_offset += buf.len() as u64;
1046            }
1047
1048            if !dim_entries.is_empty() {
1049                field_tocs.push(SparseFieldToc {
1050                    field_id,
1051                    quantization: quantization as u8,
1052                    dims: dim_entries,
1053                });
1054            }
1055        }
1056
1057        if field_tocs.is_empty() {
1058            return Ok(());
1059        }
1060
1061        let toc_offset = current_offset;
1062        write_sparse_toc_and_footer(writer, toc_offset, &field_tocs).map_err(crate::Error::Io)?;
1063
1064        Ok(())
1065    }
1066
1067    /// Stream positions directly to disk, returning only the offset map.
1068    ///
1069    /// Consumes the position_index and writes each position posting list
1070    /// directly to the writer, tracking offsets for the postings phase.
1071    fn build_positions_streaming(
1072        position_index: HashMap<TermKey, PositionPostingListBuilder>,
1073        term_interner: &Rodeo,
1074        writer: &mut dyn Write,
1075    ) -> Result<FxHashMap<Vec<u8>, (u64, u32)>> {
1076        use crate::structures::PositionPostingList;
1077
1078        let mut position_offsets: FxHashMap<Vec<u8>, (u64, u32)> = FxHashMap::default();
1079
1080        // Consume HashMap into Vec for sorting (owned, no borrowing)
1081        let mut entries: Vec<(Vec<u8>, PositionPostingListBuilder)> = position_index
1082            .into_iter()
1083            .map(|(term_key, pos_builder)| {
1084                let term_str = term_interner.resolve(&term_key.term);
1085                let mut key = Vec::with_capacity(size_of::<u32>() + term_str.len());
1086                key.extend_from_slice(&term_key.field.to_le_bytes());
1087                key.extend_from_slice(term_str.as_bytes());
1088                (key, pos_builder)
1089            })
1090            .collect();
1091
1092        entries.sort_by(|a, b| a.0.cmp(&b.0));
1093
1094        let mut current_offset = 0u64;
1095        let mut buf = Vec::new();
1096
1097        for (key, pos_builder) in entries {
1098            let mut pos_list = PositionPostingList::with_capacity(pos_builder.postings.len());
1099            for (doc_id, positions) in pos_builder.postings {
1100                pos_list.push(doc_id, positions);
1101            }
1102
1103            // Serialize to reusable buffer, then write
1104            buf.clear();
1105            pos_list.serialize(&mut buf).map_err(crate::Error::Io)?;
1106            writer.write_all(&buf)?;
1107
1108            position_offsets.insert(key, (current_offset, buf.len() as u32));
1109            current_offset += buf.len() as u64;
1110        }
1111
1112        Ok(position_offsets)
1113    }
1114
1115    /// Stream postings directly to disk.
1116    ///
1117    /// Parallel serialization of posting lists, then sequential streaming of
1118    /// term dict and postings data directly to writers (no Vec<u8> accumulation).
1119    fn build_postings_streaming(
1120        inverted_index: HashMap<TermKey, PostingListBuilder>,
1121        term_interner: Rodeo,
1122        position_offsets: &FxHashMap<Vec<u8>, (u64, u32)>,
1123        term_dict_writer: &mut dyn Write,
1124        postings_writer: &mut dyn Write,
1125    ) -> Result<()> {
1126        // Phase 1: Consume HashMap into sorted Vec (frees HashMap overhead)
1127        let mut term_entries: Vec<(Vec<u8>, PostingListBuilder)> = inverted_index
1128            .into_iter()
1129            .map(|(term_key, posting_list)| {
1130                let term_str = term_interner.resolve(&term_key.term);
1131                let mut key = Vec::with_capacity(4 + term_str.len());
1132                key.extend_from_slice(&term_key.field.to_le_bytes());
1133                key.extend_from_slice(term_str.as_bytes());
1134                (key, posting_list)
1135            })
1136            .collect();
1137
1138        drop(term_interner);
1139
1140        term_entries.par_sort_unstable_by(|a, b| a.0.cmp(&b.0));
1141
1142        // Phase 2: Parallel serialization
1143        // For inline-eligible terms (no positions, few postings), extract doc_ids/tfs
1144        // directly from CompactPosting without creating an intermediate PostingList.
1145        let serialized: Vec<(Vec<u8>, SerializedPosting)> = term_entries
1146            .into_par_iter()
1147            .map(|(key, posting_builder)| {
1148                let has_positions = position_offsets.contains_key(&key);
1149
1150                // Fast path: try inline first (avoids PostingList + BlockPostingList allocs)
1151                if !has_positions {
1152                    let doc_ids: Vec<u32> =
1153                        posting_builder.postings.iter().map(|p| p.doc_id).collect();
1154                    let term_freqs: Vec<u32> = posting_builder
1155                        .postings
1156                        .iter()
1157                        .map(|p| p.term_freq as u32)
1158                        .collect();
1159                    if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
1160                        return Ok((key, SerializedPosting::Inline(inline)));
1161                    }
1162                }
1163
1164                // Slow path: build full PostingList → BlockPostingList → serialize
1165                let mut full_postings = PostingList::with_capacity(posting_builder.len());
1166                for p in &posting_builder.postings {
1167                    full_postings.push(p.doc_id, p.term_freq as u32);
1168                }
1169
1170                let mut posting_bytes = Vec::new();
1171                let block_list =
1172                    crate::structures::BlockPostingList::from_posting_list(&full_postings)?;
1173                block_list.serialize(&mut posting_bytes)?;
1174                let result = SerializedPosting::External {
1175                    bytes: posting_bytes,
1176                    doc_count: full_postings.doc_count(),
1177                };
1178
1179                Ok((key, result))
1180            })
1181            .collect::<Result<Vec<_>>>()?;
1182
1183        // Phase 3: Stream directly to writers (no intermediate Vec<u8> accumulation)
1184        let mut postings_offset = 0u64;
1185        let mut writer = SSTableWriter::<_, TermInfo>::new(term_dict_writer);
1186
1187        for (key, serialized_posting) in serialized {
1188            let term_info = match serialized_posting {
1189                SerializedPosting::Inline(info) => info,
1190                SerializedPosting::External { bytes, doc_count } => {
1191                    let posting_len = bytes.len() as u32;
1192                    postings_writer.write_all(&bytes)?;
1193
1194                    let info = if let Some(&(pos_offset, pos_len)) = position_offsets.get(&key) {
1195                        TermInfo::external_with_positions(
1196                            postings_offset,
1197                            posting_len,
1198                            doc_count,
1199                            pos_offset,
1200                            pos_len,
1201                        )
1202                    } else {
1203                        TermInfo::external(postings_offset, posting_len, doc_count)
1204                    };
1205                    postings_offset += posting_len as u64;
1206                    info
1207                }
1208            };
1209
1210            writer.insert(&key, &term_info)?;
1211        }
1212
1213        let _ = writer.finish()?;
1214        Ok(())
1215    }
1216
1217    /// Stream compressed document store directly to disk.
1218    ///
1219    /// Reads pre-serialized document bytes from temp file and passes them
1220    /// directly to the store writer via `store_raw`, avoiding the
1221    /// deserialize→Document→reserialize roundtrip entirely.
1222    fn build_store_streaming(
1223        store_path: &PathBuf,
1224        num_compression_threads: usize,
1225        compression_level: CompressionLevel,
1226        writer: &mut dyn Write,
1227    ) -> Result<()> {
1228        use super::store::EagerParallelStoreWriter;
1229
1230        let file = File::open(store_path)?;
1231        let mmap = unsafe { memmap2::Mmap::map(&file)? };
1232
1233        let mut store_writer = EagerParallelStoreWriter::with_compression_level(
1234            writer,
1235            num_compression_threads,
1236            compression_level,
1237        );
1238
1239        // Stream pre-serialized doc bytes directly — no deserialization needed.
1240        // Temp file format: [doc_len: u32 LE][doc_bytes: doc_len bytes] repeated.
1241        let mut offset = 0usize;
1242        while offset + 4 <= mmap.len() {
1243            let doc_len = u32::from_le_bytes([
1244                mmap[offset],
1245                mmap[offset + 1],
1246                mmap[offset + 2],
1247                mmap[offset + 3],
1248            ]) as usize;
1249            offset += 4;
1250
1251            if offset + doc_len > mmap.len() {
1252                break;
1253            }
1254
1255            let doc_bytes = &mmap[offset..offset + doc_len];
1256            store_writer.store_raw(doc_bytes)?;
1257            offset += doc_len;
1258        }
1259
1260        store_writer.finish()?;
1261        Ok(())
1262    }
1263}
1264
1265impl Drop for SegmentBuilder {
1266    fn drop(&mut self) {
1267        // Cleanup temp files on drop
1268        let _ = std::fs::remove_file(&self.store_path);
1269    }
1270}