hermes_core/segment/
builder.rs

1//! Streaming segment builder with optimized memory usage
2//!
3//! Key optimizations:
4//! - **String interning**: Terms are interned using `lasso` to avoid repeated allocations
5//! - **hashbrown HashMap**: O(1) average insertion instead of BTreeMap's O(log n)
6//! - **Streaming document store**: Documents written to disk immediately
7//! - **Incremental posting flush**: Large posting lists flushed to temp file
8//! - **Memory-mapped intermediate files**: Reduces memory pressure
9//! - **Arena allocation**: Batch allocations for reduced fragmentation
10
11#[cfg(feature = "native")]
12use std::fs::{File, OpenOptions};
13#[cfg(feature = "native")]
14use std::io::{BufWriter, Write};
15#[cfg(feature = "native")]
16use std::path::PathBuf;
17#[cfg(feature = "native")]
18use std::sync::Arc;
19
20#[cfg(feature = "native")]
21use hashbrown::HashMap;
22#[cfg(feature = "native")]
23use lasso::{Rodeo, Spur};
24#[cfg(feature = "native")]
25use rustc_hash::FxHashMap;
26
27#[cfg(feature = "native")]
28use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
29#[cfg(feature = "native")]
30use crate::compression::CompressionLevel;
31#[cfg(feature = "native")]
32use crate::directories::{Directory, DirectoryWriter};
33#[cfg(feature = "native")]
34use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
35#[cfg(feature = "native")]
36use crate::structures::{PostingList, SSTableWriter, TermInfo};
37#[cfg(feature = "native")]
38use crate::tokenizer::BoxedTokenizer;
39#[cfg(feature = "native")]
40use crate::wand::WandData;
41#[cfg(feature = "native")]
42use crate::{DocId, Result};
43
44/// Size of the document store buffer before writing to disk
45#[cfg(feature = "native")]
46const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; // 16MB
47
48/// Interned term key combining field and term
49#[cfg(feature = "native")]
50#[derive(Clone, Copy, PartialEq, Eq, Hash)]
51struct TermKey {
52    field: u32,
53    term: Spur,
54}
55
56/// Compact posting entry for in-memory storage
57#[cfg(feature = "native")]
58#[derive(Clone, Copy)]
59struct CompactPosting {
60    doc_id: DocId,
61    term_freq: u16,
62}
63
64/// In-memory posting list for a term
65#[cfg(feature = "native")]
66struct PostingListBuilder {
67    /// In-memory postings
68    postings: Vec<CompactPosting>,
69}
70
71#[cfg(feature = "native")]
72impl PostingListBuilder {
73    fn new() -> Self {
74        Self {
75            postings: Vec::new(),
76        }
77    }
78
79    /// Add a posting, merging if same doc_id as last
80    #[inline]
81    fn add(&mut self, doc_id: DocId, term_freq: u32) {
82        // Check if we can merge with the last posting
83        if let Some(last) = self.postings.last_mut()
84            && last.doc_id == doc_id
85        {
86            last.term_freq = last.term_freq.saturating_add(term_freq as u16);
87            return;
88        }
89        self.postings.push(CompactPosting {
90            doc_id,
91            term_freq: term_freq.min(u16::MAX as u32) as u16,
92        });
93    }
94
95    fn len(&self) -> usize {
96        self.postings.len()
97    }
98}
99
100/// Statistics for debugging segment builder performance
101#[cfg(feature = "native")]
102#[derive(Debug, Clone)]
103pub struct SegmentBuilderStats {
104    /// Number of documents indexed
105    pub num_docs: u32,
106    /// Number of unique terms in the inverted index
107    pub unique_terms: usize,
108    /// Total postings in memory (across all terms)
109    pub postings_in_memory: usize,
110    /// Number of interned strings
111    pub interned_strings: usize,
112    /// Size of doc_field_lengths vector
113    pub doc_field_lengths_size: usize,
114}
115
116/// Configuration for segment builder
117#[cfg(feature = "native")]
118#[derive(Clone)]
119pub struct SegmentBuilderConfig {
120    /// Directory for temporary spill files
121    pub temp_dir: PathBuf,
122    /// Compression level for document store
123    pub compression_level: CompressionLevel,
124    /// Number of threads for parallel compression
125    pub num_compression_threads: usize,
126    /// Initial capacity for term interner
127    pub interner_capacity: usize,
128    /// Initial capacity for posting lists hashmap
129    pub posting_map_capacity: usize,
130}
131
132#[cfg(feature = "native")]
133impl Default for SegmentBuilderConfig {
134    fn default() -> Self {
135        Self {
136            temp_dir: std::env::temp_dir(),
137            compression_level: CompressionLevel(7),
138            num_compression_threads: num_cpus::get(),
139            interner_capacity: 1_000_000,
140            posting_map_capacity: 500_000,
141        }
142    }
143}
144
145/// Segment builder with optimized memory usage
146///
147/// Features:
148/// - Streams documents to disk immediately (no in-memory document storage)
149/// - Uses string interning for terms (reduced allocations)
150/// - Uses hashbrown HashMap (faster than BTreeMap)
151#[cfg(feature = "native")]
152pub struct SegmentBuilder {
153    schema: Schema,
154    config: SegmentBuilderConfig,
155    tokenizers: FxHashMap<Field, BoxedTokenizer>,
156
157    /// String interner for terms - O(1) lookup and deduplication
158    term_interner: Rodeo,
159
160    /// Inverted index: term key -> posting list
161    inverted_index: HashMap<TermKey, PostingListBuilder>,
162
163    /// Streaming document store writer
164    store_file: BufWriter<File>,
165    store_path: PathBuf,
166
167    /// Document count
168    next_doc_id: DocId,
169
170    /// Per-field statistics for BM25F
171    field_stats: FxHashMap<u32, FieldStats>,
172
173    /// Per-document field lengths stored compactly
174    /// Uses a flat Vec instead of Vec<HashMap> for better cache locality
175    /// Layout: [doc0_field0_len, doc0_field1_len, ..., doc1_field0_len, ...]
176    doc_field_lengths: Vec<u32>,
177    num_indexed_fields: usize,
178    field_to_slot: FxHashMap<u32, usize>,
179
180    /// Optional pre-computed WAND data for IDF values
181    wand_data: Option<Arc<WandData>>,
182
183    /// Reusable buffer for per-document term frequency aggregation
184    /// Avoids allocating a new hashmap for each document
185    local_tf_buffer: FxHashMap<Spur, u32>,
186
187    /// Reusable buffer for tokenization to avoid per-token String allocations
188    token_buffer: String,
189}
190
191#[cfg(feature = "native")]
192impl SegmentBuilder {
193    /// Create a new segment builder
194    pub fn new(schema: Schema, config: SegmentBuilderConfig) -> Result<Self> {
195        let segment_id = uuid::Uuid::new_v4();
196        let store_path = config
197            .temp_dir
198            .join(format!("hermes_store_{}.tmp", segment_id));
199
200        let store_file = BufWriter::with_capacity(
201            STORE_BUFFER_SIZE,
202            OpenOptions::new()
203                .create(true)
204                .write(true)
205                .truncate(true)
206                .open(&store_path)?,
207        );
208
209        // Count indexed fields for compact field length storage
210        let mut num_indexed_fields = 0;
211        let mut field_to_slot = FxHashMap::default();
212        for (field, entry) in schema.fields() {
213            if entry.indexed && matches!(entry.field_type, FieldType::Text) {
214                field_to_slot.insert(field.0, num_indexed_fields);
215                num_indexed_fields += 1;
216            }
217        }
218
219        Ok(Self {
220            schema,
221            tokenizers: FxHashMap::default(),
222            term_interner: Rodeo::new(),
223            inverted_index: HashMap::with_capacity(config.posting_map_capacity),
224            store_file,
225            store_path,
226            next_doc_id: 0,
227            field_stats: FxHashMap::default(),
228            doc_field_lengths: Vec::new(),
229            num_indexed_fields,
230            field_to_slot,
231            wand_data: None,
232            local_tf_buffer: FxHashMap::default(),
233            token_buffer: String::with_capacity(64),
234            config,
235        })
236    }
237
238    /// Create with pre-computed WAND data
239    pub fn with_wand_data(
240        schema: Schema,
241        config: SegmentBuilderConfig,
242        wand_data: Arc<WandData>,
243    ) -> Result<Self> {
244        let mut builder = Self::new(schema, config)?;
245        builder.wand_data = Some(wand_data);
246        Ok(builder)
247    }
248
249    pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
250        self.tokenizers.insert(field, tokenizer);
251    }
252
253    pub fn num_docs(&self) -> u32 {
254        self.next_doc_id
255    }
256
257    /// Get current statistics for debugging performance
258    pub fn stats(&self) -> SegmentBuilderStats {
259        let postings_in_memory: usize =
260            self.inverted_index.values().map(|p| p.postings.len()).sum();
261        SegmentBuilderStats {
262            num_docs: self.next_doc_id,
263            unique_terms: self.inverted_index.len(),
264            postings_in_memory,
265            interned_strings: self.term_interner.len(),
266            doc_field_lengths_size: self.doc_field_lengths.len(),
267        }
268    }
269
270    /// Add a document - streams to disk immediately
271    pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
272        let doc_id = self.next_doc_id;
273        self.next_doc_id += 1;
274
275        // Initialize field lengths for this document
276        let base_idx = self.doc_field_lengths.len();
277        self.doc_field_lengths
278            .resize(base_idx + self.num_indexed_fields, 0);
279
280        for (field, value) in doc.field_values() {
281            let entry = self.schema.get_field_entry(*field);
282            if entry.is_none() || !entry.unwrap().indexed {
283                continue;
284            }
285
286            let entry = entry.unwrap();
287            match (&entry.field_type, value) {
288                (FieldType::Text, FieldValue::Text(text)) => {
289                    let token_count = self.index_text_field(*field, doc_id, text)?;
290
291                    // Update field statistics
292                    let stats = self.field_stats.entry(field.0).or_default();
293                    stats.total_tokens += token_count as u64;
294                    stats.doc_count += 1;
295
296                    // Store field length compactly
297                    if let Some(&slot) = self.field_to_slot.get(&field.0) {
298                        self.doc_field_lengths[base_idx + slot] = token_count;
299                    }
300                }
301                (FieldType::U64, FieldValue::U64(v)) => {
302                    self.index_numeric_field(*field, doc_id, *v)?;
303                }
304                (FieldType::I64, FieldValue::I64(v)) => {
305                    self.index_numeric_field(*field, doc_id, *v as u64)?;
306                }
307                (FieldType::F64, FieldValue::F64(v)) => {
308                    self.index_numeric_field(*field, doc_id, v.to_bits())?;
309                }
310                _ => {}
311            }
312        }
313
314        // Stream document to disk immediately
315        self.write_document_to_store(&doc)?;
316
317        Ok(doc_id)
318    }
319
320    /// Index a text field using interned terms
321    ///
322    /// Optimization: Zero-allocation inline tokenization + term frequency aggregation.
323    /// Instead of allocating a String per token, we:
324    /// 1. Iterate over whitespace-split words
325    /// 2. Build lowercase token in a reusable buffer
326    /// 3. Intern directly from the buffer
327    fn index_text_field(&mut self, field: Field, doc_id: DocId, text: &str) -> Result<u32> {
328        // Phase 1: Aggregate term frequencies within this document
329        // Reuse buffer to avoid allocations
330        self.local_tf_buffer.clear();
331
332        let mut token_count = 0u32;
333
334        // Zero-allocation tokenization: iterate words, lowercase inline, intern directly
335        for word in text.split_whitespace() {
336            // Build lowercase token in reusable buffer
337            self.token_buffer.clear();
338            for c in word.chars() {
339                if c.is_alphanumeric() {
340                    for lc in c.to_lowercase() {
341                        self.token_buffer.push(lc);
342                    }
343                }
344            }
345
346            if self.token_buffer.is_empty() {
347                continue;
348            }
349
350            token_count += 1;
351
352            // Intern the term directly from buffer - O(1) amortized
353            let term_spur = self.term_interner.get_or_intern(&self.token_buffer);
354            *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
355        }
356
357        // Phase 2: Insert aggregated terms into inverted index
358        // Now we only do one inverted_index lookup per unique term in doc
359        let field_id = field.0;
360
361        for (&term_spur, &tf) in &self.local_tf_buffer {
362            let term_key = TermKey {
363                field: field_id,
364                term: term_spur,
365            };
366
367            let posting = self
368                .inverted_index
369                .entry(term_key)
370                .or_insert_with(PostingListBuilder::new);
371            posting.add(doc_id, tf);
372        }
373
374        Ok(token_count)
375    }
376
377    fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
378        // For numeric fields, we use a special encoding
379        let term_str = format!("__num_{}", value);
380        let term_spur = self.term_interner.get_or_intern(&term_str);
381
382        let term_key = TermKey {
383            field: field.0,
384            term: term_spur,
385        };
386
387        let posting = self
388            .inverted_index
389            .entry(term_key)
390            .or_insert_with(PostingListBuilder::new);
391        posting.add(doc_id, 1);
392
393        Ok(())
394    }
395
396    /// Write document to streaming store
397    fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
398        use byteorder::{LittleEndian, WriteBytesExt};
399
400        let doc_bytes = super::store::serialize_document(doc, &self.schema)?;
401
402        self.store_file
403            .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
404        self.store_file.write_all(&doc_bytes)?;
405
406        Ok(())
407    }
408
409    /// Build the final segment
410    pub async fn build<D: Directory + DirectoryWriter>(
411        mut self,
412        dir: &D,
413        segment_id: SegmentId,
414    ) -> Result<SegmentMeta> {
415        // Flush any buffered data
416        self.store_file.flush()?;
417
418        let files = SegmentFiles::new(segment_id.0);
419
420        // Build term dictionary and postings
421        let (term_dict_data, postings_data) = self.build_postings()?;
422
423        // Build document store from streamed data
424        let store_data = self.build_store_from_stream()?;
425
426        // Write to directory
427        dir.write(&files.term_dict, &term_dict_data).await?;
428        dir.write(&files.postings, &postings_data).await?;
429        dir.write(&files.store, &store_data).await?;
430
431        let meta = SegmentMeta {
432            id: segment_id.0,
433            num_docs: self.next_doc_id,
434            field_stats: self.field_stats.clone(),
435        };
436
437        dir.write(&files.meta, &meta.serialize()?).await?;
438
439        // Cleanup temp files
440        let _ = std::fs::remove_file(&self.store_path);
441
442        Ok(meta)
443    }
444
445    /// Build postings from inverted index
446    fn build_postings(&mut self) -> Result<(Vec<u8>, Vec<u8>)> {
447        use std::collections::BTreeMap;
448
449        // We need to sort terms for SSTable, so collect into BTreeMap
450        // Key format: field_id (4 bytes) + term bytes
451        let mut sorted_terms: BTreeMap<Vec<u8>, &PostingListBuilder> = BTreeMap::new();
452
453        for (term_key, posting_list) in &self.inverted_index {
454            let term_str = self.term_interner.resolve(&term_key.term);
455            let mut key = Vec::with_capacity(4 + term_str.len());
456            key.extend_from_slice(&term_key.field.to_le_bytes());
457            key.extend_from_slice(term_str.as_bytes());
458            sorted_terms.insert(key, posting_list);
459        }
460
461        let mut term_dict = Vec::new();
462        let mut postings = Vec::new();
463        let mut writer = SSTableWriter::<TermInfo>::new(&mut term_dict);
464
465        for (key, posting_builder) in sorted_terms {
466            // Build posting list from in-memory postings
467            let mut full_postings = PostingList::with_capacity(posting_builder.len());
468            for p in &posting_builder.postings {
469                full_postings.push(p.doc_id, p.term_freq as u32);
470            }
471
472            // Build term info
473            let doc_ids: Vec<u32> = full_postings.iter().map(|p| p.doc_id).collect();
474            let term_freqs: Vec<u32> = full_postings.iter().map(|p| p.term_freq).collect();
475
476            let term_info = if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
477                inline
478            } else {
479                let posting_offset = postings.len() as u64;
480                let block_list =
481                    crate::structures::BlockPostingList::from_posting_list(&full_postings)?;
482                block_list.serialize(&mut postings)?;
483                TermInfo::external(
484                    posting_offset,
485                    (postings.len() as u64 - posting_offset) as u32,
486                    full_postings.doc_count(),
487                )
488            };
489
490            writer.insert(&key, &term_info)?;
491        }
492
493        writer.finish()?;
494        Ok((term_dict, postings))
495    }
496
497    /// Build document store from streamed temp file
498    fn build_store_from_stream(&mut self) -> Result<Vec<u8>> {
499        use super::store::EagerParallelStoreWriter;
500
501        // Memory-map the temp store file
502        drop(std::mem::replace(
503            &mut self.store_file,
504            BufWriter::new(File::create("/dev/null")?),
505        ));
506
507        let file = File::open(&self.store_path)?;
508        let mmap = unsafe { memmap2::Mmap::map(&file)? };
509
510        // Re-compress with proper block structure using parallel compression
511        let mut store_data = Vec::new();
512        let mut store_writer = EagerParallelStoreWriter::with_compression_level(
513            &mut store_data,
514            self.config.num_compression_threads,
515            self.config.compression_level,
516        );
517
518        let mut offset = 0usize;
519        while offset < mmap.len() {
520            if offset + 4 > mmap.len() {
521                break;
522            }
523
524            let doc_len = u32::from_le_bytes([
525                mmap[offset],
526                mmap[offset + 1],
527                mmap[offset + 2],
528                mmap[offset + 3],
529            ]) as usize;
530            offset += 4;
531
532            if offset + doc_len > mmap.len() {
533                break;
534            }
535
536            let doc_bytes = &mmap[offset..offset + doc_len];
537            offset += doc_len;
538
539            // Deserialize and re-store with proper compression
540            if let Ok(doc) = super::store::deserialize_document(doc_bytes, &self.schema) {
541                store_writer.store(&doc, &self.schema)?;
542            }
543        }
544
545        store_writer.finish()?;
546        Ok(store_data)
547    }
548}
549
550#[cfg(feature = "native")]
551impl Drop for SegmentBuilder {
552    fn drop(&mut self) {
553        // Cleanup temp files on drop
554        let _ = std::fs::remove_file(&self.store_path);
555    }
556}