hermes_core/segment/
builder.rs

1//! Streaming segment builder with optimized memory usage
2//!
3//! Key optimizations:
4//! - **String interning**: Terms are interned using `lasso` to avoid repeated allocations
5//! - **hashbrown HashMap**: O(1) average insertion instead of BTreeMap's O(log n)
6//! - **Streaming document store**: Documents written to disk immediately
7//! - **Incremental posting flush**: Large posting lists flushed to temp file
8//! - **Memory-mapped intermediate files**: Reduces memory pressure
9//! - **Arena allocation**: Batch allocations for reduced fragmentation
10
11#[cfg(feature = "native")]
12use std::fs::{File, OpenOptions};
13#[cfg(feature = "native")]
14use std::io::{BufWriter, Write};
15#[cfg(feature = "native")]
16use std::path::PathBuf;
17#[cfg(feature = "native")]
18use std::sync::Arc;
19
20#[cfg(feature = "native")]
21use hashbrown::HashMap;
22#[cfg(feature = "native")]
23use lasso::{Rodeo, Spur};
24#[cfg(feature = "native")]
25use rustc_hash::FxHashMap;
26
27#[cfg(feature = "native")]
28use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
29#[cfg(feature = "native")]
30use crate::compression::CompressionLevel;
31#[cfg(feature = "native")]
32use crate::directories::{Directory, DirectoryWriter};
33#[cfg(feature = "native")]
34use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
35#[cfg(feature = "native")]
36use crate::structures::{PostingList, SSTableWriter, TermInfo};
37#[cfg(feature = "native")]
38use crate::tokenizer::BoxedTokenizer;
39#[cfg(feature = "native")]
40use crate::wand::WandData;
41#[cfg(feature = "native")]
42use crate::{DocId, Result};
43
44/// Size of the document store buffer before writing to disk
45#[cfg(feature = "native")]
46const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; // 16MB
47
48/// Number of shards for the inverted index
49/// Power of 2 for fast modulo via bitwise AND
50#[cfg(feature = "native")]
51const NUM_INDEX_SHARDS: usize = 64;
52
53/// Interned term key combining field and term
54#[cfg(feature = "native")]
55#[derive(Clone, Copy, PartialEq, Eq, Hash)]
56struct TermKey {
57    field: u32,
58    term: Spur,
59}
60
61#[cfg(feature = "native")]
62impl TermKey {
63    /// Get shard index for this term key (uses term's internal id for distribution)
64    #[inline]
65    fn shard(&self) -> usize {
66        // Spur wraps NonZero<u32>, get the raw value for sharding
67        // Using bitwise AND since NUM_INDEX_SHARDS is power of 2
68        (self.term.into_inner().get() as usize) & (NUM_INDEX_SHARDS - 1)
69    }
70}
71
72/// Sharded inverted index for better cache locality
73/// Each shard is a smaller hashmap that fits better in CPU cache
74#[cfg(feature = "native")]
75struct ShardedInvertedIndex {
76    shards: Vec<HashMap<TermKey, PostingListBuilder>>,
77}
78
79#[cfg(feature = "native")]
80impl ShardedInvertedIndex {
81    fn new(capacity_per_shard: usize) -> Self {
82        let mut shards = Vec::with_capacity(NUM_INDEX_SHARDS);
83        for _ in 0..NUM_INDEX_SHARDS {
84            shards.push(HashMap::with_capacity(capacity_per_shard));
85        }
86        Self { shards }
87    }
88
89    /// Get or insert a posting list for the given term key
90    #[inline]
91    fn get_or_insert(&mut self, key: TermKey) -> &mut PostingListBuilder {
92        self.shards[key.shard()]
93            .entry(key)
94            .or_insert_with(PostingListBuilder::new)
95    }
96
97    fn len(&self) -> usize {
98        self.shards.iter().map(|s| s.len()).sum()
99    }
100
101    /// Get total number of in-memory postings across all shards
102    fn total_postings_in_memory(&self) -> usize {
103        self.shards
104            .iter()
105            .flat_map(|s| s.values())
106            .map(|p| p.postings.len())
107            .sum()
108    }
109
110    /// Get shard size distribution (min, max, avg)
111    fn shard_stats(&self) -> (usize, usize, usize) {
112        let sizes: Vec<usize> = self.shards.iter().map(|s| s.len()).collect();
113        let min = *sizes.iter().min().unwrap_or(&0);
114        let max = *sizes.iter().max().unwrap_or(&0);
115        let avg = if sizes.is_empty() {
116            0
117        } else {
118            sizes.iter().sum::<usize>() / sizes.len()
119        };
120        (min, max, avg)
121    }
122}
123
124/// Iterator over all entries in the sharded index
125#[cfg(feature = "native")]
126struct ShardedIndexIter<'a> {
127    shards: std::slice::Iter<'a, HashMap<TermKey, PostingListBuilder>>,
128    current: Option<hashbrown::hash_map::Iter<'a, TermKey, PostingListBuilder>>,
129}
130
131#[cfg(feature = "native")]
132impl<'a> Iterator for ShardedIndexIter<'a> {
133    type Item = (&'a TermKey, &'a PostingListBuilder);
134
135    fn next(&mut self) -> Option<Self::Item> {
136        loop {
137            if let Some(ref mut current) = self.current
138                && let Some(item) = current.next()
139            {
140                return Some(item);
141            }
142            // Move to next shard
143            match self.shards.next() {
144                Some(shard) => self.current = Some(shard.iter()),
145                None => return None,
146            }
147        }
148    }
149}
150
151#[cfg(feature = "native")]
152impl<'a> IntoIterator for &'a ShardedInvertedIndex {
153    type Item = (&'a TermKey, &'a PostingListBuilder);
154    type IntoIter = ShardedIndexIter<'a>;
155
156    fn into_iter(self) -> Self::IntoIter {
157        ShardedIndexIter {
158            shards: self.shards.iter(),
159            current: None,
160        }
161    }
162}
163
164/// Compact posting entry for in-memory storage
165#[cfg(feature = "native")]
166#[derive(Clone, Copy)]
167struct CompactPosting {
168    doc_id: DocId,
169    term_freq: u16,
170}
171
172/// In-memory posting list for a term
173#[cfg(feature = "native")]
174struct PostingListBuilder {
175    /// In-memory postings
176    postings: Vec<CompactPosting>,
177}
178
179#[cfg(feature = "native")]
180impl PostingListBuilder {
181    fn new() -> Self {
182        Self {
183            postings: Vec::new(),
184        }
185    }
186
187    /// Add a posting, merging if same doc_id as last
188    #[inline]
189    fn add(&mut self, doc_id: DocId, term_freq: u32) {
190        // Check if we can merge with the last posting
191        if let Some(last) = self.postings.last_mut()
192            && last.doc_id == doc_id
193        {
194            last.term_freq = last.term_freq.saturating_add(term_freq as u16);
195            return;
196        }
197        self.postings.push(CompactPosting {
198            doc_id,
199            term_freq: term_freq.min(u16::MAX as u32) as u16,
200        });
201    }
202
203    fn len(&self) -> usize {
204        self.postings.len()
205    }
206}
207
208/// Statistics for debugging segment builder performance
209#[cfg(feature = "native")]
210#[derive(Debug, Clone)]
211pub struct SegmentBuilderStats {
212    /// Number of documents indexed
213    pub num_docs: u32,
214    /// Number of unique terms in the inverted index
215    pub unique_terms: usize,
216    /// Total postings in memory (across all terms)
217    pub postings_in_memory: usize,
218    /// Number of interned strings
219    pub interned_strings: usize,
220    /// Size of doc_field_lengths vector
221    pub doc_field_lengths_size: usize,
222    /// Shard distribution (min, max, avg terms per shard)
223    pub shard_min: usize,
224    pub shard_max: usize,
225    pub shard_avg: usize,
226}
227
228/// Configuration for segment builder
229#[cfg(feature = "native")]
230#[derive(Clone)]
231pub struct SegmentBuilderConfig {
232    /// Directory for temporary spill files
233    pub temp_dir: PathBuf,
234    /// Compression level for document store
235    pub compression_level: CompressionLevel,
236    /// Number of threads for parallel compression
237    pub num_compression_threads: usize,
238    /// Initial capacity for term interner
239    pub interner_capacity: usize,
240    /// Initial capacity for posting lists hashmap
241    pub posting_map_capacity: usize,
242}
243
244#[cfg(feature = "native")]
245impl Default for SegmentBuilderConfig {
246    fn default() -> Self {
247        Self {
248            temp_dir: std::env::temp_dir(),
249            compression_level: CompressionLevel(7),
250            num_compression_threads: num_cpus::get(),
251            interner_capacity: 1_000_000,
252            posting_map_capacity: 500_000,
253        }
254    }
255}
256
257/// Segment builder with optimized memory usage
258///
259/// Features:
260/// - Streams documents to disk immediately (no in-memory document storage)
261/// - Uses string interning for terms (reduced allocations)
262/// - Uses hashbrown HashMap (faster than BTreeMap)
263/// - Spills large posting lists to disk (bounded memory)
264#[cfg(feature = "native")]
265pub struct SegmentBuilder {
266    schema: Schema,
267    config: SegmentBuilderConfig,
268    tokenizers: FxHashMap<Field, BoxedTokenizer>,
269
270    /// String interner for terms - O(1) lookup and deduplication
271    term_interner: Rodeo,
272
273    /// Sharded inverted index for better cache locality
274    /// Each shard is a smaller hashmap that fits better in CPU cache
275    inverted_index: ShardedInvertedIndex,
276
277    /// Streaming document store writer
278    store_file: BufWriter<File>,
279    store_path: PathBuf,
280
281    /// Document count
282    next_doc_id: DocId,
283
284    /// Per-field statistics for BM25F
285    field_stats: FxHashMap<u32, FieldStats>,
286
287    /// Per-document field lengths stored compactly
288    /// Uses a flat Vec instead of Vec<HashMap> for better cache locality
289    /// Layout: [doc0_field0_len, doc0_field1_len, ..., doc1_field0_len, ...]
290    doc_field_lengths: Vec<u32>,
291    num_indexed_fields: usize,
292    field_to_slot: FxHashMap<u32, usize>,
293
294    /// Optional pre-computed WAND data for IDF values
295    wand_data: Option<Arc<WandData>>,
296
297    /// Reusable buffer for per-document term frequency aggregation
298    /// Avoids allocating a new hashmap for each document
299    local_tf_buffer: FxHashMap<Spur, u32>,
300
301    /// Reusable buffer for tokenization to avoid per-token String allocations
302    token_buffer: String,
303}
304
305#[cfg(feature = "native")]
306impl SegmentBuilder {
307    /// Create a new segment builder
308    pub fn new(schema: Schema, config: SegmentBuilderConfig) -> Result<Self> {
309        let segment_id = uuid::Uuid::new_v4();
310        let store_path = config
311            .temp_dir
312            .join(format!("hermes_store_{}.tmp", segment_id));
313
314        let store_file = BufWriter::with_capacity(
315            STORE_BUFFER_SIZE,
316            OpenOptions::new()
317                .create(true)
318                .write(true)
319                .truncate(true)
320                .open(&store_path)?,
321        );
322
323        // Count indexed fields for compact field length storage
324        let mut num_indexed_fields = 0;
325        let mut field_to_slot = FxHashMap::default();
326        for (field, entry) in schema.fields() {
327            if entry.indexed && matches!(entry.field_type, FieldType::Text) {
328                field_to_slot.insert(field.0, num_indexed_fields);
329                num_indexed_fields += 1;
330            }
331        }
332
333        Ok(Self {
334            schema,
335            tokenizers: FxHashMap::default(),
336            term_interner: Rodeo::new(),
337            inverted_index: ShardedInvertedIndex::new(
338                config.posting_map_capacity / NUM_INDEX_SHARDS,
339            ),
340            store_file,
341            store_path,
342            next_doc_id: 0,
343            field_stats: FxHashMap::default(),
344            doc_field_lengths: Vec::new(),
345            num_indexed_fields,
346            field_to_slot,
347            wand_data: None,
348            local_tf_buffer: FxHashMap::default(),
349            token_buffer: String::with_capacity(64),
350            config,
351        })
352    }
353
354    /// Create with pre-computed WAND data
355    pub fn with_wand_data(
356        schema: Schema,
357        config: SegmentBuilderConfig,
358        wand_data: Arc<WandData>,
359    ) -> Result<Self> {
360        let mut builder = Self::new(schema, config)?;
361        builder.wand_data = Some(wand_data);
362        Ok(builder)
363    }
364
365    pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
366        self.tokenizers.insert(field, tokenizer);
367    }
368
369    pub fn num_docs(&self) -> u32 {
370        self.next_doc_id
371    }
372
373    /// Get current statistics for debugging performance
374    pub fn stats(&self) -> SegmentBuilderStats {
375        let (shard_min, shard_max, shard_avg) = self.inverted_index.shard_stats();
376        SegmentBuilderStats {
377            num_docs: self.next_doc_id,
378            unique_terms: self.inverted_index.len(),
379            postings_in_memory: self.inverted_index.total_postings_in_memory(),
380            interned_strings: self.term_interner.len(),
381            doc_field_lengths_size: self.doc_field_lengths.len(),
382            shard_min,
383            shard_max,
384            shard_avg,
385        }
386    }
387
388    /// Add a document - streams to disk immediately
389    pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
390        let doc_id = self.next_doc_id;
391        self.next_doc_id += 1;
392
393        // Initialize field lengths for this document
394        let base_idx = self.doc_field_lengths.len();
395        self.doc_field_lengths
396            .resize(base_idx + self.num_indexed_fields, 0);
397
398        for (field, value) in doc.field_values() {
399            let entry = self.schema.get_field_entry(*field);
400            if entry.is_none() || !entry.unwrap().indexed {
401                continue;
402            }
403
404            let entry = entry.unwrap();
405            match (&entry.field_type, value) {
406                (FieldType::Text, FieldValue::Text(text)) => {
407                    let token_count = self.index_text_field(*field, doc_id, text)?;
408
409                    // Update field statistics
410                    let stats = self.field_stats.entry(field.0).or_default();
411                    stats.total_tokens += token_count as u64;
412                    stats.doc_count += 1;
413
414                    // Store field length compactly
415                    if let Some(&slot) = self.field_to_slot.get(&field.0) {
416                        self.doc_field_lengths[base_idx + slot] = token_count;
417                    }
418                }
419                (FieldType::U64, FieldValue::U64(v)) => {
420                    self.index_numeric_field(*field, doc_id, *v)?;
421                }
422                (FieldType::I64, FieldValue::I64(v)) => {
423                    self.index_numeric_field(*field, doc_id, *v as u64)?;
424                }
425                (FieldType::F64, FieldValue::F64(v)) => {
426                    self.index_numeric_field(*field, doc_id, v.to_bits())?;
427                }
428                _ => {}
429            }
430        }
431
432        // Stream document to disk immediately
433        self.write_document_to_store(&doc)?;
434
435        Ok(doc_id)
436    }
437
438    /// Index a text field using interned terms
439    ///
440    /// Optimization: Zero-allocation inline tokenization + term frequency aggregation.
441    /// Instead of allocating a String per token, we:
442    /// 1. Iterate over whitespace-split words
443    /// 2. Build lowercase token in a reusable buffer
444    /// 3. Intern directly from the buffer
445    fn index_text_field(&mut self, field: Field, doc_id: DocId, text: &str) -> Result<u32> {
446        // Phase 1: Aggregate term frequencies within this document
447        // Reuse buffer to avoid allocations
448        self.local_tf_buffer.clear();
449
450        let mut token_count = 0u32;
451
452        // Zero-allocation tokenization: iterate words, lowercase inline, intern directly
453        for word in text.split_whitespace() {
454            // Build lowercase token in reusable buffer
455            self.token_buffer.clear();
456            for c in word.chars() {
457                if c.is_alphanumeric() {
458                    for lc in c.to_lowercase() {
459                        self.token_buffer.push(lc);
460                    }
461                }
462            }
463
464            if self.token_buffer.is_empty() {
465                continue;
466            }
467
468            token_count += 1;
469
470            // Intern the term directly from buffer - O(1) amortized
471            let term_spur = self.term_interner.get_or_intern(&self.token_buffer);
472            *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
473        }
474
475        // Phase 2: Insert aggregated terms into inverted index
476        // Now we only do one inverted_index lookup per unique term in doc
477        let field_id = field.0;
478
479        for (&term_spur, &tf) in &self.local_tf_buffer {
480            let term_key = TermKey {
481                field: field_id,
482                term: term_spur,
483            };
484
485            let posting = self.inverted_index.get_or_insert(term_key);
486            posting.add(doc_id, tf);
487        }
488
489        Ok(token_count)
490    }
491
492    fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
493        // For numeric fields, we use a special encoding
494        let term_str = format!("__num_{}", value);
495        let term_spur = self.term_interner.get_or_intern(&term_str);
496
497        let term_key = TermKey {
498            field: field.0,
499            term: term_spur,
500        };
501
502        let posting = self.inverted_index.get_or_insert(term_key);
503        posting.add(doc_id, 1);
504
505        Ok(())
506    }
507
508    /// Write document to streaming store
509    fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
510        use byteorder::{LittleEndian, WriteBytesExt};
511
512        let doc_bytes = super::store::serialize_document(doc, &self.schema)?;
513
514        self.store_file
515            .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
516        self.store_file.write_all(&doc_bytes)?;
517
518        Ok(())
519    }
520
521    /// Build the final segment
522    pub async fn build<D: Directory + DirectoryWriter>(
523        mut self,
524        dir: &D,
525        segment_id: SegmentId,
526    ) -> Result<SegmentMeta> {
527        // Flush any buffered data
528        self.store_file.flush()?;
529
530        let files = SegmentFiles::new(segment_id.0);
531
532        // Build term dictionary and postings
533        let (term_dict_data, postings_data) = self.build_postings()?;
534
535        // Build document store from streamed data
536        let store_data = self.build_store_from_stream()?;
537
538        // Write to directory
539        dir.write(&files.term_dict, &term_dict_data).await?;
540        dir.write(&files.postings, &postings_data).await?;
541        dir.write(&files.store, &store_data).await?;
542
543        let meta = SegmentMeta {
544            id: segment_id.0,
545            num_docs: self.next_doc_id,
546            field_stats: self.field_stats.clone(),
547        };
548
549        dir.write(&files.meta, &meta.serialize()?).await?;
550
551        // Cleanup temp files
552        let _ = std::fs::remove_file(&self.store_path);
553
554        Ok(meta)
555    }
556
557    /// Build postings from inverted index
558    fn build_postings(&mut self) -> Result<(Vec<u8>, Vec<u8>)> {
559        use std::collections::BTreeMap;
560
561        // We need to sort terms for SSTable, so collect into BTreeMap
562        // Key format: field_id (4 bytes) + term bytes
563        let mut sorted_terms: BTreeMap<Vec<u8>, &PostingListBuilder> = BTreeMap::new();
564
565        for (term_key, posting_list) in &self.inverted_index {
566            let term_str = self.term_interner.resolve(&term_key.term);
567            let mut key = Vec::with_capacity(4 + term_str.len());
568            key.extend_from_slice(&term_key.field.to_le_bytes());
569            key.extend_from_slice(term_str.as_bytes());
570            sorted_terms.insert(key, posting_list);
571        }
572
573        let mut term_dict = Vec::new();
574        let mut postings = Vec::new();
575        let mut writer = SSTableWriter::<TermInfo>::new(&mut term_dict);
576
577        for (key, posting_builder) in sorted_terms {
578            // Build posting list from in-memory postings
579            let mut full_postings = PostingList::with_capacity(posting_builder.len());
580            for p in &posting_builder.postings {
581                full_postings.push(p.doc_id, p.term_freq as u32);
582            }
583
584            // Build term info
585            let doc_ids: Vec<u32> = full_postings.iter().map(|p| p.doc_id).collect();
586            let term_freqs: Vec<u32> = full_postings.iter().map(|p| p.term_freq).collect();
587
588            let term_info = if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
589                inline
590            } else {
591                let posting_offset = postings.len() as u64;
592                let block_list =
593                    crate::structures::BlockPostingList::from_posting_list(&full_postings)?;
594                block_list.serialize(&mut postings)?;
595                TermInfo::external(
596                    posting_offset,
597                    (postings.len() as u64 - posting_offset) as u32,
598                    full_postings.doc_count(),
599                )
600            };
601
602            writer.insert(&key, &term_info)?;
603        }
604
605        writer.finish()?;
606        Ok((term_dict, postings))
607    }
608
609    /// Build document store from streamed temp file
610    fn build_store_from_stream(&mut self) -> Result<Vec<u8>> {
611        use super::store::EagerParallelStoreWriter;
612
613        // Memory-map the temp store file
614        drop(std::mem::replace(
615            &mut self.store_file,
616            BufWriter::new(File::create("/dev/null")?),
617        ));
618
619        let file = File::open(&self.store_path)?;
620        let mmap = unsafe { memmap2::Mmap::map(&file)? };
621
622        // Re-compress with proper block structure using parallel compression
623        let mut store_data = Vec::new();
624        let mut store_writer = EagerParallelStoreWriter::with_compression_level(
625            &mut store_data,
626            self.config.num_compression_threads,
627            self.config.compression_level,
628        );
629
630        let mut offset = 0usize;
631        while offset < mmap.len() {
632            if offset + 4 > mmap.len() {
633                break;
634            }
635
636            let doc_len = u32::from_le_bytes([
637                mmap[offset],
638                mmap[offset + 1],
639                mmap[offset + 2],
640                mmap[offset + 3],
641            ]) as usize;
642            offset += 4;
643
644            if offset + doc_len > mmap.len() {
645                break;
646            }
647
648            let doc_bytes = &mmap[offset..offset + doc_len];
649            offset += doc_len;
650
651            // Deserialize and re-store with proper compression
652            if let Ok(doc) = super::store::deserialize_document(doc_bytes, &self.schema) {
653                store_writer.store(&doc, &self.schema)?;
654            }
655        }
656
657        store_writer.finish()?;
658        Ok(store_data)
659    }
660}
661
662#[cfg(feature = "native")]
663impl Drop for SegmentBuilder {
664    fn drop(&mut self) {
665        // Cleanup temp files on drop
666        let _ = std::fs::remove_file(&self.store_path);
667    }
668}