hermes_core/segment/
builder.rs

1//! Streaming segment builder with optimized memory usage
2//!
3//! Key optimizations:
4//! - **String interning**: Terms are interned using `lasso` to avoid repeated allocations
5//! - **hashbrown HashMap**: O(1) average insertion instead of BTreeMap's O(log n)
6//! - **Streaming document store**: Documents written to disk immediately
7//! - **Incremental posting flush**: Large posting lists flushed to temp file
8//! - **Memory-mapped intermediate files**: Reduces memory pressure
9//! - **Arena allocation**: Batch allocations for reduced fragmentation
10
11use std::fs::{File, OpenOptions};
12use std::io::{BufWriter, Write};
13use std::path::PathBuf;
14use std::sync::Arc;
15
16use hashbrown::HashMap;
17use lasso::{Rodeo, Spur};
18use rayon::prelude::*;
19use rustc_hash::FxHashMap;
20
21use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
22use crate::compression::CompressionLevel;
23use crate::directories::{Directory, DirectoryWriter};
24use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
25use crate::structures::{PostingList, SSTableWriter, TermInfo};
26use crate::tokenizer::BoxedTokenizer;
27use crate::wand::WandData;
28use crate::{DocId, Result};
29
30/// Size of the document store buffer before writing to disk
31const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; // 16MB
32
33/// Interned term key combining field and term
34#[derive(Clone, Copy, PartialEq, Eq, Hash)]
35struct TermKey {
36    field: u32,
37    term: Spur,
38}
39
40/// Compact posting entry for in-memory storage
41#[derive(Clone, Copy)]
42struct CompactPosting {
43    doc_id: DocId,
44    term_freq: u16,
45}
46
47/// In-memory posting list for a term
48struct PostingListBuilder {
49    /// In-memory postings
50    postings: Vec<CompactPosting>,
51}
52
53impl PostingListBuilder {
54    fn new() -> Self {
55        Self {
56            postings: Vec::new(),
57        }
58    }
59
60    /// Add a posting, merging if same doc_id as last
61    #[inline]
62    fn add(&mut self, doc_id: DocId, term_freq: u32) {
63        // Check if we can merge with the last posting
64        if let Some(last) = self.postings.last_mut()
65            && last.doc_id == doc_id
66        {
67            last.term_freq = last.term_freq.saturating_add(term_freq as u16);
68            return;
69        }
70        self.postings.push(CompactPosting {
71            doc_id,
72            term_freq: term_freq.min(u16::MAX as u32) as u16,
73        });
74    }
75
76    fn len(&self) -> usize {
77        self.postings.len()
78    }
79}
80
81/// Intermediate result for parallel posting serialization
82enum SerializedPosting {
83    /// Inline posting (small enough to fit in TermInfo)
84    Inline(TermInfo),
85    /// External posting with serialized bytes
86    External { bytes: Vec<u8>, doc_count: u32 },
87}
88
89/// Statistics for debugging segment builder performance
90#[derive(Debug, Clone)]
91pub struct SegmentBuilderStats {
92    /// Number of documents indexed
93    pub num_docs: u32,
94    /// Number of unique terms in the inverted index
95    pub unique_terms: usize,
96    /// Total postings in memory (across all terms)
97    pub postings_in_memory: usize,
98    /// Number of interned strings
99    pub interned_strings: usize,
100    /// Size of doc_field_lengths vector
101    pub doc_field_lengths_size: usize,
102}
103
104/// Configuration for segment builder
105#[derive(Clone)]
106pub struct SegmentBuilderConfig {
107    /// Directory for temporary spill files
108    pub temp_dir: PathBuf,
109    /// Compression level for document store
110    pub compression_level: CompressionLevel,
111    /// Number of threads for parallel compression
112    pub num_compression_threads: usize,
113    /// Initial capacity for term interner
114    pub interner_capacity: usize,
115    /// Initial capacity for posting lists hashmap
116    pub posting_map_capacity: usize,
117}
118
119impl Default for SegmentBuilderConfig {
120    fn default() -> Self {
121        Self {
122            temp_dir: std::env::temp_dir(),
123            compression_level: CompressionLevel(7),
124            num_compression_threads: num_cpus::get(),
125            interner_capacity: 1_000_000,
126            posting_map_capacity: 500_000,
127        }
128    }
129}
130
131/// Segment builder with optimized memory usage
132///
133/// Features:
134/// - Streams documents to disk immediately (no in-memory document storage)
135/// - Uses string interning for terms (reduced allocations)
136/// - Uses hashbrown HashMap (faster than BTreeMap)
137pub struct SegmentBuilder {
138    schema: Schema,
139    config: SegmentBuilderConfig,
140    tokenizers: FxHashMap<Field, BoxedTokenizer>,
141
142    /// String interner for terms - O(1) lookup and deduplication
143    term_interner: Rodeo,
144
145    /// Inverted index: term key -> posting list
146    inverted_index: HashMap<TermKey, PostingListBuilder>,
147
148    /// Streaming document store writer
149    store_file: BufWriter<File>,
150    store_path: PathBuf,
151
152    /// Document count
153    next_doc_id: DocId,
154
155    /// Per-field statistics for BM25F
156    field_stats: FxHashMap<u32, FieldStats>,
157
158    /// Per-document field lengths stored compactly
159    /// Uses a flat Vec instead of Vec<HashMap> for better cache locality
160    /// Layout: [doc0_field0_len, doc0_field1_len, ..., doc1_field0_len, ...]
161    doc_field_lengths: Vec<u32>,
162    num_indexed_fields: usize,
163    field_to_slot: FxHashMap<u32, usize>,
164
165    /// Optional pre-computed WAND data for IDF values
166    wand_data: Option<Arc<WandData>>,
167
168    /// Reusable buffer for per-document term frequency aggregation
169    /// Avoids allocating a new hashmap for each document
170    local_tf_buffer: FxHashMap<Spur, u32>,
171
172    /// Reusable buffer for tokenization to avoid per-token String allocations
173    token_buffer: String,
174}
175
176impl SegmentBuilder {
177    /// Create a new segment builder
178    pub fn new(schema: Schema, config: SegmentBuilderConfig) -> Result<Self> {
179        let segment_id = uuid::Uuid::new_v4();
180        let store_path = config
181            .temp_dir
182            .join(format!("hermes_store_{}.tmp", segment_id));
183
184        let store_file = BufWriter::with_capacity(
185            STORE_BUFFER_SIZE,
186            OpenOptions::new()
187                .create(true)
188                .write(true)
189                .truncate(true)
190                .open(&store_path)?,
191        );
192
193        // Count indexed fields for compact field length storage
194        let mut num_indexed_fields = 0;
195        let mut field_to_slot = FxHashMap::default();
196        for (field, entry) in schema.fields() {
197            if entry.indexed && matches!(entry.field_type, FieldType::Text) {
198                field_to_slot.insert(field.0, num_indexed_fields);
199                num_indexed_fields += 1;
200            }
201        }
202
203        Ok(Self {
204            schema,
205            tokenizers: FxHashMap::default(),
206            term_interner: Rodeo::new(),
207            inverted_index: HashMap::with_capacity(config.posting_map_capacity),
208            store_file,
209            store_path,
210            next_doc_id: 0,
211            field_stats: FxHashMap::default(),
212            doc_field_lengths: Vec::new(),
213            num_indexed_fields,
214            field_to_slot,
215            wand_data: None,
216            local_tf_buffer: FxHashMap::default(),
217            token_buffer: String::with_capacity(64),
218            config,
219        })
220    }
221
222    /// Create with pre-computed WAND data
223    pub fn with_wand_data(
224        schema: Schema,
225        config: SegmentBuilderConfig,
226        wand_data: Arc<WandData>,
227    ) -> Result<Self> {
228        let mut builder = Self::new(schema, config)?;
229        builder.wand_data = Some(wand_data);
230        Ok(builder)
231    }
232
233    pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
234        self.tokenizers.insert(field, tokenizer);
235    }
236
237    pub fn num_docs(&self) -> u32 {
238        self.next_doc_id
239    }
240
241    /// Get current statistics for debugging performance
242    pub fn stats(&self) -> SegmentBuilderStats {
243        let postings_in_memory: usize =
244            self.inverted_index.values().map(|p| p.postings.len()).sum();
245        SegmentBuilderStats {
246            num_docs: self.next_doc_id,
247            unique_terms: self.inverted_index.len(),
248            postings_in_memory,
249            interned_strings: self.term_interner.len(),
250            doc_field_lengths_size: self.doc_field_lengths.len(),
251        }
252    }
253
254    /// Add a document - streams to disk immediately
255    pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
256        let doc_id = self.next_doc_id;
257        self.next_doc_id += 1;
258
259        // Initialize field lengths for this document
260        let base_idx = self.doc_field_lengths.len();
261        self.doc_field_lengths
262            .resize(base_idx + self.num_indexed_fields, 0);
263
264        for (field, value) in doc.field_values() {
265            let entry = self.schema.get_field_entry(*field);
266            if entry.is_none() || !entry.unwrap().indexed {
267                continue;
268            }
269
270            let entry = entry.unwrap();
271            match (&entry.field_type, value) {
272                (FieldType::Text, FieldValue::Text(text)) => {
273                    let token_count = self.index_text_field(*field, doc_id, text)?;
274
275                    // Update field statistics
276                    let stats = self.field_stats.entry(field.0).or_default();
277                    stats.total_tokens += token_count as u64;
278                    stats.doc_count += 1;
279
280                    // Store field length compactly
281                    if let Some(&slot) = self.field_to_slot.get(&field.0) {
282                        self.doc_field_lengths[base_idx + slot] = token_count;
283                    }
284                }
285                (FieldType::U64, FieldValue::U64(v)) => {
286                    self.index_numeric_field(*field, doc_id, *v)?;
287                }
288                (FieldType::I64, FieldValue::I64(v)) => {
289                    self.index_numeric_field(*field, doc_id, *v as u64)?;
290                }
291                (FieldType::F64, FieldValue::F64(v)) => {
292                    self.index_numeric_field(*field, doc_id, v.to_bits())?;
293                }
294                _ => {}
295            }
296        }
297
298        // Stream document to disk immediately
299        self.write_document_to_store(&doc)?;
300
301        Ok(doc_id)
302    }
303
304    /// Index a text field using interned terms
305    ///
306    /// Optimization: Zero-allocation inline tokenization + term frequency aggregation.
307    /// Instead of allocating a String per token, we:
308    /// 1. Iterate over whitespace-split words
309    /// 2. Build lowercase token in a reusable buffer
310    /// 3. Intern directly from the buffer
311    fn index_text_field(&mut self, field: Field, doc_id: DocId, text: &str) -> Result<u32> {
312        // Phase 1: Aggregate term frequencies within this document
313        // Reuse buffer to avoid allocations
314        self.local_tf_buffer.clear();
315
316        let mut token_count = 0u32;
317
318        // Zero-allocation tokenization: iterate words, lowercase inline, intern directly
319        for word in text.split_whitespace() {
320            // Build lowercase token in reusable buffer
321            self.token_buffer.clear();
322            for c in word.chars() {
323                if c.is_alphanumeric() {
324                    for lc in c.to_lowercase() {
325                        self.token_buffer.push(lc);
326                    }
327                }
328            }
329
330            if self.token_buffer.is_empty() {
331                continue;
332            }
333
334            token_count += 1;
335
336            // Intern the term directly from buffer - O(1) amortized
337            let term_spur = self.term_interner.get_or_intern(&self.token_buffer);
338            *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
339        }
340
341        // Phase 2: Insert aggregated terms into inverted index
342        // Now we only do one inverted_index lookup per unique term in doc
343        let field_id = field.0;
344
345        for (&term_spur, &tf) in &self.local_tf_buffer {
346            let term_key = TermKey {
347                field: field_id,
348                term: term_spur,
349            };
350
351            let posting = self
352                .inverted_index
353                .entry(term_key)
354                .or_insert_with(PostingListBuilder::new);
355            posting.add(doc_id, tf);
356        }
357
358        Ok(token_count)
359    }
360
361    fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
362        // For numeric fields, we use a special encoding
363        let term_str = format!("__num_{}", value);
364        let term_spur = self.term_interner.get_or_intern(&term_str);
365
366        let term_key = TermKey {
367            field: field.0,
368            term: term_spur,
369        };
370
371        let posting = self
372            .inverted_index
373            .entry(term_key)
374            .or_insert_with(PostingListBuilder::new);
375        posting.add(doc_id, 1);
376
377        Ok(())
378    }
379
380    /// Write document to streaming store
381    fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
382        use byteorder::{LittleEndian, WriteBytesExt};
383
384        let doc_bytes = super::store::serialize_document(doc, &self.schema)?;
385
386        self.store_file
387            .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
388        self.store_file.write_all(&doc_bytes)?;
389
390        Ok(())
391    }
392
393    /// Build the final segment
394    pub async fn build<D: Directory + DirectoryWriter>(
395        mut self,
396        dir: &D,
397        segment_id: SegmentId,
398    ) -> Result<SegmentMeta> {
399        // Flush any buffered data
400        self.store_file.flush()?;
401
402        let files = SegmentFiles::new(segment_id.0);
403
404        // Extract data needed for parallel processing
405        let store_path = self.store_path.clone();
406        let schema = self.schema.clone();
407        let num_compression_threads = self.config.num_compression_threads;
408        let compression_level = self.config.compression_level;
409
410        // Build postings and document store in parallel
411        let (postings_result, store_result) = rayon::join(
412            || self.build_postings(),
413            || {
414                Self::build_store_parallel(
415                    &store_path,
416                    &schema,
417                    num_compression_threads,
418                    compression_level,
419                )
420            },
421        );
422
423        let (term_dict_data, postings_data) = postings_result?;
424        let store_data = store_result?;
425
426        // Write to directory
427        dir.write(&files.term_dict, &term_dict_data).await?;
428        dir.write(&files.postings, &postings_data).await?;
429        dir.write(&files.store, &store_data).await?;
430
431        let meta = SegmentMeta {
432            id: segment_id.0,
433            num_docs: self.next_doc_id,
434            field_stats: self.field_stats.clone(),
435        };
436
437        dir.write(&files.meta, &meta.serialize()?).await?;
438
439        // Cleanup temp files
440        let _ = std::fs::remove_file(&self.store_path);
441
442        Ok(meta)
443    }
444
445    /// Build postings from inverted index
446    ///
447    /// Uses parallel processing to serialize posting lists concurrently.
448    fn build_postings(&mut self) -> Result<(Vec<u8>, Vec<u8>)> {
449        // Phase 1: Collect and sort term keys (parallel key generation)
450        // Key format: field_id (4 bytes) + term bytes
451        let mut term_entries: Vec<(Vec<u8>, &PostingListBuilder)> = self
452            .inverted_index
453            .iter()
454            .map(|(term_key, posting_list)| {
455                let term_str = self.term_interner.resolve(&term_key.term);
456                let mut key = Vec::with_capacity(4 + term_str.len());
457                key.extend_from_slice(&term_key.field.to_le_bytes());
458                key.extend_from_slice(term_str.as_bytes());
459                (key, posting_list)
460            })
461            .collect();
462
463        // Sort by key for SSTable ordering
464        term_entries.par_sort_unstable_by(|a, b| a.0.cmp(&b.0));
465
466        // Phase 2: Parallel serialization of posting lists
467        // Each term's posting list is serialized independently
468        let serialized: Vec<(Vec<u8>, SerializedPosting)> = term_entries
469            .into_par_iter()
470            .map(|(key, posting_builder)| {
471                // Build posting list from in-memory postings
472                let mut full_postings = PostingList::with_capacity(posting_builder.len());
473                for p in &posting_builder.postings {
474                    full_postings.push(p.doc_id, p.term_freq as u32);
475                }
476
477                // Build term info
478                let doc_ids: Vec<u32> = full_postings.iter().map(|p| p.doc_id).collect();
479                let term_freqs: Vec<u32> = full_postings.iter().map(|p| p.term_freq).collect();
480
481                let result = if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
482                    SerializedPosting::Inline(inline)
483                } else {
484                    // Serialize to local buffer
485                    let mut posting_bytes = Vec::new();
486                    let block_list =
487                        crate::structures::BlockPostingList::from_posting_list(&full_postings)
488                            .expect("BlockPostingList creation failed");
489                    block_list
490                        .serialize(&mut posting_bytes)
491                        .expect("BlockPostingList serialization failed");
492                    SerializedPosting::External {
493                        bytes: posting_bytes,
494                        doc_count: full_postings.doc_count(),
495                    }
496                };
497
498                (key, result)
499            })
500            .collect();
501
502        // Phase 3: Sequential assembly (must be sequential for offset calculation)
503        let mut term_dict = Vec::new();
504        let mut postings = Vec::new();
505        let mut writer = SSTableWriter::<TermInfo>::new(&mut term_dict);
506
507        for (key, serialized_posting) in serialized {
508            let term_info = match serialized_posting {
509                SerializedPosting::Inline(info) => info,
510                SerializedPosting::External { bytes, doc_count } => {
511                    let posting_offset = postings.len() as u64;
512                    let posting_len = bytes.len() as u32;
513                    postings.extend_from_slice(&bytes);
514                    TermInfo::external(posting_offset, posting_len, doc_count)
515                }
516            };
517
518            writer.insert(&key, &term_info)?;
519        }
520
521        writer.finish()?;
522        Ok((term_dict, postings))
523    }
524
525    /// Build document store from streamed temp file (static method for parallel execution)
526    ///
527    /// Uses parallel processing to deserialize documents concurrently.
528    fn build_store_parallel(
529        store_path: &PathBuf,
530        schema: &Schema,
531        num_compression_threads: usize,
532        compression_level: CompressionLevel,
533    ) -> Result<Vec<u8>> {
534        use super::store::EagerParallelStoreWriter;
535
536        let file = File::open(store_path)?;
537        let mmap = unsafe { memmap2::Mmap::map(&file)? };
538
539        // Phase 1: Parse document boundaries (sequential, fast)
540        let mut doc_ranges: Vec<(usize, usize)> = Vec::new();
541        let mut offset = 0usize;
542        while offset + 4 <= mmap.len() {
543            let doc_len = u32::from_le_bytes([
544                mmap[offset],
545                mmap[offset + 1],
546                mmap[offset + 2],
547                mmap[offset + 3],
548            ]) as usize;
549            offset += 4;
550
551            if offset + doc_len > mmap.len() {
552                break;
553            }
554
555            doc_ranges.push((offset, doc_len));
556            offset += doc_len;
557        }
558
559        // Phase 2: Parallel deserialization of documents
560        let docs: Vec<Document> = doc_ranges
561            .into_par_iter()
562            .filter_map(|(start, len)| {
563                let doc_bytes = &mmap[start..start + len];
564                super::store::deserialize_document(doc_bytes, schema).ok()
565            })
566            .collect();
567
568        // Phase 3: Write to store (compression is already parallel in EagerParallelStoreWriter)
569        let mut store_data = Vec::new();
570        let mut store_writer = EagerParallelStoreWriter::with_compression_level(
571            &mut store_data,
572            num_compression_threads,
573            compression_level,
574        );
575
576        for doc in &docs {
577            store_writer.store(doc, schema)?;
578        }
579
580        store_writer.finish()?;
581        Ok(store_data)
582    }
583}
584
585impl Drop for SegmentBuilder {
586    fn drop(&mut self) {
587        // Cleanup temp files on drop
588        let _ = std::fs::remove_file(&self.store_path);
589    }
590}