hermes_core/segment/
builder.rs

1//! Streaming segment builder with optimized memory usage
2//!
3//! Key optimizations:
4//! - **String interning**: Terms are interned using `lasso` to avoid repeated allocations
5//! - **hashbrown HashMap**: O(1) average insertion instead of BTreeMap's O(log n)
6//! - **Streaming document store**: Documents written to disk immediately
7//! - **Incremental posting flush**: Large posting lists flushed to temp file
8//! - **Memory-mapped intermediate files**: Reduces memory pressure
9//! - **Arena allocation**: Batch allocations for reduced fragmentation
10
11#[cfg(feature = "native")]
12use std::fs::{File, OpenOptions};
13#[cfg(feature = "native")]
14use std::io::{BufWriter, Write};
15#[cfg(feature = "native")]
16use std::path::PathBuf;
17#[cfg(feature = "native")]
18use std::sync::Arc;
19
20#[cfg(feature = "native")]
21use hashbrown::HashMap;
22#[cfg(feature = "native")]
23use lasso::{Rodeo, Spur};
24#[cfg(feature = "native")]
25use rustc_hash::FxHashMap;
26
27#[cfg(feature = "native")]
28use super::store::StoreWriter;
29#[cfg(feature = "native")]
30use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
31#[cfg(feature = "native")]
32use crate::compression::CompressionLevel;
33#[cfg(feature = "native")]
34use crate::directories::{Directory, DirectoryWriter};
35#[cfg(feature = "native")]
36use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
37#[cfg(feature = "native")]
38use crate::structures::{PostingList, SSTableWriter, TermInfo};
39#[cfg(feature = "native")]
40use crate::tokenizer::{BoxedTokenizer, LowercaseTokenizer};
41#[cfg(feature = "native")]
42use crate::wand::WandData;
43#[cfg(feature = "native")]
44use crate::{DocId, Result};
45
46/// Threshold for flushing a posting list to disk (number of postings)
47#[cfg(feature = "native")]
48const POSTING_FLUSH_THRESHOLD: usize = 100_000;
49
50/// Size of the posting spill buffer before writing to disk
51#[cfg(feature = "native")]
52const SPILL_BUFFER_SIZE: usize = 16 * 1024 * 1024; // 16MB
53
54/// Interned term key combining field and term
55#[cfg(feature = "native")]
56#[derive(Clone, Copy, PartialEq, Eq, Hash)]
57struct TermKey {
58    field: u32,
59    term: Spur,
60}
61
62/// Compact posting entry for in-memory storage
63#[cfg(feature = "native")]
64#[derive(Clone, Copy)]
65struct CompactPosting {
66    doc_id: DocId,
67    term_freq: u16, // Most term frequencies fit in u16
68}
69
70/// Posting list that can spill to disk when too large
71#[cfg(feature = "native")]
72struct SpillablePostingList {
73    /// In-memory postings (hot data)
74    memory: Vec<CompactPosting>,
75    /// Offset in spill file where flushed postings start (-1 if none)
76    spill_offset: i64,
77    /// Number of postings in spill file
78    spill_count: u32,
79}
80
81#[cfg(feature = "native")]
82impl SpillablePostingList {
83    fn new() -> Self {
84        Self {
85            memory: Vec::new(),
86            spill_offset: -1,
87            spill_count: 0,
88        }
89    }
90
91    #[allow(dead_code)]
92    fn with_capacity(capacity: usize) -> Self {
93        Self {
94            memory: Vec::with_capacity(capacity),
95            spill_offset: -1,
96            spill_count: 0,
97        }
98    }
99
100    #[inline]
101    fn add(&mut self, doc_id: DocId, term_freq: u32) {
102        // Merge with last posting if same doc_id
103        if let Some(last) = self.memory.last_mut()
104            && last.doc_id == doc_id
105        {
106            last.term_freq = last.term_freq.saturating_add(term_freq as u16);
107            return;
108        }
109        self.memory.push(CompactPosting {
110            doc_id,
111            term_freq: term_freq.min(u16::MAX as u32) as u16,
112        });
113    }
114
115    fn total_count(&self) -> usize {
116        self.memory.len() + self.spill_count as usize
117    }
118
119    fn needs_spill(&self) -> bool {
120        self.memory.len() >= POSTING_FLUSH_THRESHOLD
121    }
122}
123
124/// Configuration for segment builder
125#[cfg(feature = "native")]
126#[derive(Clone)]
127pub struct SegmentBuilderConfig {
128    /// Directory for temporary spill files
129    pub temp_dir: PathBuf,
130    /// Compression level for document store
131    pub compression_level: CompressionLevel,
132    /// Number of threads for parallel compression
133    pub num_compression_threads: usize,
134    /// Initial capacity for term interner
135    pub interner_capacity: usize,
136    /// Initial capacity for posting lists hashmap
137    pub posting_map_capacity: usize,
138}
139
140#[cfg(feature = "native")]
141impl Default for SegmentBuilderConfig {
142    fn default() -> Self {
143        Self {
144            temp_dir: std::env::temp_dir(),
145            compression_level: CompressionLevel(7),
146            num_compression_threads: num_cpus::get(),
147            interner_capacity: 1_000_000,
148            posting_map_capacity: 500_000,
149        }
150    }
151}
152
153/// Segment builder with optimized memory usage
154///
155/// Features:
156/// - Streams documents to disk immediately (no in-memory document storage)
157/// - Uses string interning for terms (reduced allocations)
158/// - Uses hashbrown HashMap (faster than BTreeMap)
159/// - Spills large posting lists to disk (bounded memory)
160#[cfg(feature = "native")]
161pub struct SegmentBuilder {
162    schema: Schema,
163    config: SegmentBuilderConfig,
164    tokenizers: FxHashMap<Field, BoxedTokenizer>,
165
166    /// String interner for terms - O(1) lookup and deduplication
167    term_interner: Rodeo,
168
169    /// Inverted index using interned term keys
170    /// hashbrown HashMap has better cache locality than BTreeMap
171    inverted_index: HashMap<TermKey, SpillablePostingList>,
172
173    /// Streaming document store writer
174    store_file: BufWriter<File>,
175    store_path: PathBuf,
176
177    /// Spill file for large posting lists
178    spill_file: Option<BufWriter<File>>,
179    spill_path: PathBuf,
180    spill_offset: u64,
181
182    /// Document count
183    next_doc_id: DocId,
184
185    /// Per-field statistics for BM25F
186    field_stats: FxHashMap<u32, FieldStats>,
187
188    /// Per-document field lengths stored compactly
189    /// Uses a flat Vec instead of Vec<HashMap> for better cache locality
190    /// Layout: [doc0_field0_len, doc0_field1_len, ..., doc1_field0_len, ...]
191    doc_field_lengths: Vec<u32>,
192    num_indexed_fields: usize,
193    field_to_slot: FxHashMap<u32, usize>,
194
195    /// Optional pre-computed WAND data for IDF values
196    wand_data: Option<Arc<WandData>>,
197}
198
199#[cfg(feature = "native")]
200impl SegmentBuilder {
201    /// Create a new segment builder
202    pub fn new(schema: Schema, config: SegmentBuilderConfig) -> Result<Self> {
203        let segment_id = uuid::Uuid::new_v4();
204        let store_path = config
205            .temp_dir
206            .join(format!("hermes_store_{}.tmp", segment_id));
207        let spill_path = config
208            .temp_dir
209            .join(format!("hermes_spill_{}.tmp", segment_id));
210
211        let store_file = BufWriter::with_capacity(
212            SPILL_BUFFER_SIZE,
213            OpenOptions::new()
214                .create(true)
215                .write(true)
216                .truncate(true)
217                .open(&store_path)?,
218        );
219
220        // Count indexed fields for compact field length storage
221        let mut num_indexed_fields = 0;
222        let mut field_to_slot = FxHashMap::default();
223        for (field, entry) in schema.fields() {
224            if entry.indexed && matches!(entry.field_type, FieldType::Text) {
225                field_to_slot.insert(field.0, num_indexed_fields);
226                num_indexed_fields += 1;
227            }
228        }
229
230        Ok(Self {
231            schema,
232            tokenizers: FxHashMap::default(),
233            term_interner: Rodeo::new(),
234            inverted_index: HashMap::with_capacity(config.posting_map_capacity),
235            store_file,
236            store_path,
237            spill_file: None,
238            spill_path,
239            spill_offset: 0,
240            next_doc_id: 0,
241            field_stats: FxHashMap::default(),
242            doc_field_lengths: Vec::new(),
243            num_indexed_fields,
244            field_to_slot,
245            wand_data: None,
246            config,
247        })
248    }
249
250    /// Create with pre-computed WAND data
251    pub fn with_wand_data(
252        schema: Schema,
253        config: SegmentBuilderConfig,
254        wand_data: Arc<WandData>,
255    ) -> Result<Self> {
256        let mut builder = Self::new(schema, config)?;
257        builder.wand_data = Some(wand_data);
258        Ok(builder)
259    }
260
261    pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
262        self.tokenizers.insert(field, tokenizer);
263    }
264
265    pub fn num_docs(&self) -> u32 {
266        self.next_doc_id
267    }
268
269    /// Add a document - streams to disk immediately
270    pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
271        let doc_id = self.next_doc_id;
272        self.next_doc_id += 1;
273
274        // Initialize field lengths for this document
275        let base_idx = self.doc_field_lengths.len();
276        self.doc_field_lengths
277            .resize(base_idx + self.num_indexed_fields, 0);
278
279        for (field, value) in doc.field_values() {
280            let entry = self.schema.get_field_entry(*field);
281            if entry.is_none() || !entry.unwrap().indexed {
282                continue;
283            }
284
285            let entry = entry.unwrap();
286            match (&entry.field_type, value) {
287                (FieldType::Text, FieldValue::Text(text)) => {
288                    let token_count = self.index_text_field(*field, doc_id, text)?;
289
290                    // Update field statistics
291                    let stats = self.field_stats.entry(field.0).or_default();
292                    stats.total_tokens += token_count as u64;
293                    stats.doc_count += 1;
294
295                    // Store field length compactly
296                    if let Some(&slot) = self.field_to_slot.get(&field.0) {
297                        self.doc_field_lengths[base_idx + slot] = token_count;
298                    }
299                }
300                (FieldType::U64, FieldValue::U64(v)) => {
301                    self.index_numeric_field(*field, doc_id, *v)?;
302                }
303                (FieldType::I64, FieldValue::I64(v)) => {
304                    self.index_numeric_field(*field, doc_id, *v as u64)?;
305                }
306                (FieldType::F64, FieldValue::F64(v)) => {
307                    self.index_numeric_field(*field, doc_id, v.to_bits())?;
308                }
309                _ => {}
310            }
311        }
312
313        // Stream document to disk immediately
314        self.write_document_to_store(&doc)?;
315
316        Ok(doc_id)
317    }
318
319    /// Index a text field using interned terms
320    fn index_text_field(&mut self, field: Field, doc_id: DocId, text: &str) -> Result<u32> {
321        let default_tokenizer = LowercaseTokenizer;
322        let tokenizer: &dyn crate::tokenizer::TokenizerClone = self
323            .tokenizers
324            .get(&field)
325            .map(|t| t.as_ref())
326            .unwrap_or(&default_tokenizer);
327
328        let tokens = tokenizer.tokenize(text);
329        let token_count = tokens.len() as u32;
330
331        for token in tokens {
332            // Intern the term - O(1) amortized
333            let term_spur = self.term_interner.get_or_intern(&token.text);
334
335            let term_key = TermKey {
336                field: field.0,
337                term: term_spur,
338            };
339
340            // hashbrown entry API is faster than BTreeMap
341            let posting = self
342                .inverted_index
343                .entry(term_key)
344                .or_insert_with(SpillablePostingList::new);
345
346            posting.add(doc_id, 1);
347
348            // Check if we need to spill this posting list
349            if posting.needs_spill() {
350                self.spill_posting_list(term_key)?;
351            }
352        }
353
354        Ok(token_count)
355    }
356
357    fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
358        // For numeric fields, we use a special encoding
359        let term_str = format!("__num_{}", value);
360        let term_spur = self.term_interner.get_or_intern(&term_str);
361
362        let term_key = TermKey {
363            field: field.0,
364            term: term_spur,
365        };
366
367        let posting = self
368            .inverted_index
369            .entry(term_key)
370            .or_insert_with(SpillablePostingList::new);
371        posting.add(doc_id, 1);
372
373        Ok(())
374    }
375
376    /// Write document to streaming store
377    fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
378        use byteorder::{LittleEndian, WriteBytesExt};
379
380        let doc_bytes = super::store::serialize_document(doc, &self.schema)?;
381
382        self.store_file
383            .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
384        self.store_file.write_all(&doc_bytes)?;
385
386        Ok(())
387    }
388
389    /// Spill a large posting list to disk
390    fn spill_posting_list(&mut self, term_key: TermKey) -> Result<()> {
391        use byteorder::{LittleEndian, WriteBytesExt};
392
393        let posting = self.inverted_index.get_mut(&term_key).unwrap();
394
395        // Initialize spill file if needed
396        if self.spill_file.is_none() {
397            let file = OpenOptions::new()
398                .create(true)
399                .write(true)
400                .read(true)
401                .truncate(true)
402                .open(&self.spill_path)?;
403            self.spill_file = Some(BufWriter::with_capacity(SPILL_BUFFER_SIZE, file));
404        }
405
406        let spill_file = self.spill_file.as_mut().unwrap();
407
408        // Record spill offset if this is first spill for this term
409        if posting.spill_offset < 0 {
410            posting.spill_offset = self.spill_offset as i64;
411        }
412
413        // Write postings to spill file
414        for p in &posting.memory {
415            spill_file.write_u32::<LittleEndian>(p.doc_id)?;
416            spill_file.write_u16::<LittleEndian>(p.term_freq)?;
417            self.spill_offset += 6; // 4 bytes doc_id + 2 bytes term_freq
418        }
419
420        posting.spill_count += posting.memory.len() as u32;
421        posting.memory.clear();
422        posting.memory.shrink_to(POSTING_FLUSH_THRESHOLD / 4); // Keep some capacity
423
424        Ok(())
425    }
426
427    /// Build the final segment
428    pub async fn build<D: Directory + DirectoryWriter>(
429        mut self,
430        dir: &D,
431        segment_id: SegmentId,
432    ) -> Result<SegmentMeta> {
433        // Flush any buffered data
434        self.store_file.flush()?;
435        if let Some(ref mut spill) = self.spill_file {
436            spill.flush()?;
437        }
438
439        let files = SegmentFiles::new(segment_id.0);
440
441        // Build term dictionary and postings
442        let (term_dict_data, postings_data) = self.build_postings()?;
443
444        // Build document store from streamed data
445        let store_data = self.build_store_from_stream()?;
446
447        // Write to directory
448        dir.write(&files.term_dict, &term_dict_data).await?;
449        dir.write(&files.postings, &postings_data).await?;
450        dir.write(&files.store, &store_data).await?;
451
452        let meta = SegmentMeta {
453            id: segment_id.0,
454            num_docs: self.next_doc_id,
455            field_stats: self.field_stats.clone(),
456        };
457
458        dir.write(&files.meta, &meta.serialize()?).await?;
459
460        // Cleanup temp files
461        let _ = std::fs::remove_file(&self.store_path);
462        let _ = std::fs::remove_file(&self.spill_path);
463
464        Ok(meta)
465    }
466
467    /// Build postings from inverted index
468    fn build_postings(&mut self) -> Result<(Vec<u8>, Vec<u8>)> {
469        use std::collections::BTreeMap;
470
471        // We need to sort terms for SSTable, so collect into BTreeMap
472        // Key format: field_id (4 bytes) + term bytes
473        let mut sorted_terms: BTreeMap<Vec<u8>, &SpillablePostingList> = BTreeMap::new();
474
475        for (term_key, posting_list) in &self.inverted_index {
476            let term_str = self.term_interner.resolve(&term_key.term);
477            let mut key = Vec::with_capacity(4 + term_str.len());
478            key.extend_from_slice(&term_key.field.to_le_bytes());
479            key.extend_from_slice(term_str.as_bytes());
480            sorted_terms.insert(key, posting_list);
481        }
482
483        let mut term_dict = Vec::new();
484        let mut postings = Vec::new();
485        let mut writer = SSTableWriter::<TermInfo>::new(&mut term_dict);
486
487        // Memory-map spill file if it exists
488        let spill_mmap = if self.spill_file.is_some() {
489            drop(self.spill_file.take()); // Close writer
490            let file = File::open(&self.spill_path)?;
491            Some(unsafe { memmap2::Mmap::map(&file)? })
492        } else {
493            None
494        };
495
496        for (key, spill_posting) in sorted_terms {
497            // Reconstruct full posting list
498            let mut full_postings = PostingList::with_capacity(spill_posting.total_count());
499
500            // Read spilled postings first (they come before in-memory ones)
501            if spill_posting.spill_offset >= 0
502                && let Some(ref mmap) = spill_mmap
503            {
504                let mut offset = spill_posting.spill_offset as usize;
505                for _ in 0..spill_posting.spill_count {
506                    let doc_id = u32::from_le_bytes([
507                        mmap[offset],
508                        mmap[offset + 1],
509                        mmap[offset + 2],
510                        mmap[offset + 3],
511                    ]);
512                    let term_freq = u16::from_le_bytes([mmap[offset + 4], mmap[offset + 5]]);
513                    full_postings.push(doc_id, term_freq as u32);
514                    offset += 6;
515                }
516            }
517
518            // Add in-memory postings
519            for p in &spill_posting.memory {
520                full_postings.push(p.doc_id, p.term_freq as u32);
521            }
522
523            // Build term info
524            let doc_ids: Vec<u32> = full_postings.iter().map(|p| p.doc_id).collect();
525            let term_freqs: Vec<u32> = full_postings.iter().map(|p| p.term_freq).collect();
526
527            let term_info = if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
528                inline
529            } else {
530                let posting_offset = postings.len() as u64;
531                let block_list =
532                    crate::structures::BlockPostingList::from_posting_list(&full_postings)?;
533                block_list.serialize(&mut postings)?;
534                TermInfo::external(
535                    posting_offset,
536                    (postings.len() as u64 - posting_offset) as u32,
537                    full_postings.doc_count(),
538                )
539            };
540
541            writer.insert(&key, &term_info)?;
542        }
543
544        writer.finish()?;
545        Ok((term_dict, postings))
546    }
547
548    /// Build document store from streamed temp file
549    fn build_store_from_stream(&mut self) -> Result<Vec<u8>> {
550        // Memory-map the temp store file
551        drop(std::mem::replace(
552            &mut self.store_file,
553            BufWriter::new(File::create("/dev/null")?),
554        ));
555
556        let file = File::open(&self.store_path)?;
557        let mmap = unsafe { memmap2::Mmap::map(&file)? };
558
559        // Re-compress with proper block structure
560        let mut store_data = Vec::new();
561        let mut store_writer =
562            StoreWriter::with_compression_level(&mut store_data, self.config.compression_level);
563
564        let mut offset = 0usize;
565        while offset < mmap.len() {
566            if offset + 4 > mmap.len() {
567                break;
568            }
569
570            let doc_len = u32::from_le_bytes([
571                mmap[offset],
572                mmap[offset + 1],
573                mmap[offset + 2],
574                mmap[offset + 3],
575            ]) as usize;
576            offset += 4;
577
578            if offset + doc_len > mmap.len() {
579                break;
580            }
581
582            let doc_bytes = &mmap[offset..offset + doc_len];
583            offset += doc_len;
584
585            // Deserialize and re-store with proper compression
586            if let Ok(doc) = super::store::deserialize_document(doc_bytes, &self.schema) {
587                store_writer.store(&doc, &self.schema)?;
588            }
589        }
590
591        store_writer.finish()?;
592        Ok(store_data)
593    }
594}
595
596#[cfg(feature = "native")]
597impl Drop for SegmentBuilder {
598    fn drop(&mut self) {
599        // Cleanup temp files on drop
600        let _ = std::fs::remove_file(&self.store_path);
601        let _ = std::fs::remove_file(&self.spill_path);
602    }
603}