Skip to main content

hermes_core/segment/
builder.rs

1//! Streaming segment builder with optimized memory usage
2//!
3//! Key optimizations:
4//! - **String interning**: Terms are interned using `lasso` to avoid repeated allocations
5//! - **hashbrown HashMap**: O(1) average insertion instead of BTreeMap's O(log n)
6//! - **Streaming document store**: Documents written to disk immediately
7//! - **Incremental posting flush**: Large posting lists flushed to temp file
8//! - **Memory-mapped intermediate files**: Reduces memory pressure
9//! - **Arena allocation**: Batch allocations for reduced fragmentation
10
11#[cfg(feature = "native")]
12use std::fs::{File, OpenOptions};
13#[cfg(feature = "native")]
14use std::io::{BufWriter, Write};
15#[cfg(feature = "native")]
16use std::path::PathBuf;
17#[cfg(feature = "native")]
18use std::sync::Arc;
19
20#[cfg(feature = "native")]
21use hashbrown::HashMap;
22#[cfg(feature = "native")]
23use lasso::{Rodeo, Spur};
24#[cfg(feature = "native")]
25use rustc_hash::FxHashMap;
26
27#[cfg(feature = "native")]
28use super::store::StoreWriter;
29#[cfg(feature = "native")]
30use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
31#[cfg(feature = "native")]
32use crate::compression::CompressionLevel;
33#[cfg(feature = "native")]
34use crate::directories::{Directory, DirectoryWriter};
35#[cfg(feature = "native")]
36use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
37#[cfg(feature = "native")]
38use crate::structures::{PostingList, SSTableWriter, TermInfo};
39#[cfg(feature = "native")]
40use crate::tokenizer::BoxedTokenizer;
41#[cfg(feature = "native")]
42use crate::wand::WandData;
43#[cfg(feature = "native")]
44use crate::{DocId, Result};
45
46/// Threshold for flushing a posting list to disk (number of postings)
47#[cfg(feature = "native")]
48const POSTING_FLUSH_THRESHOLD: usize = 10_000;
49
50/// Size of the posting spill buffer before writing to disk
51#[cfg(feature = "native")]
52const SPILL_BUFFER_SIZE: usize = 16 * 1024 * 1024; // 16MB
53
54/// Number of shards for the inverted index
55/// Power of 2 for fast modulo via bitwise AND
56#[cfg(feature = "native")]
57const NUM_INDEX_SHARDS: usize = 64;
58
59/// Interned term key combining field and term
60#[cfg(feature = "native")]
61#[derive(Clone, Copy, PartialEq, Eq, Hash)]
62struct TermKey {
63    field: u32,
64    term: Spur,
65}
66
67#[cfg(feature = "native")]
68impl TermKey {
69    /// Get shard index for this term key (uses term's internal id for distribution)
70    #[inline]
71    fn shard(&self) -> usize {
72        // Spur wraps NonZero<u32>, get the raw value for sharding
73        // Using bitwise AND since NUM_INDEX_SHARDS is power of 2
74        (self.term.into_inner().get() as usize) & (NUM_INDEX_SHARDS - 1)
75    }
76}
77
78/// Sharded inverted index for better cache locality
79/// Each shard is a smaller hashmap that fits better in CPU cache
80#[cfg(feature = "native")]
81struct ShardedInvertedIndex {
82    shards: Vec<HashMap<TermKey, SpillablePostingList>>,
83}
84
85#[cfg(feature = "native")]
86impl ShardedInvertedIndex {
87    fn new(capacity_per_shard: usize) -> Self {
88        let mut shards = Vec::with_capacity(NUM_INDEX_SHARDS);
89        for _ in 0..NUM_INDEX_SHARDS {
90            shards.push(HashMap::with_capacity(capacity_per_shard));
91        }
92        Self { shards }
93    }
94
95    #[inline]
96    fn get_mut(&mut self, key: &TermKey) -> Option<&mut SpillablePostingList> {
97        self.shards[key.shard()].get_mut(key)
98    }
99
100    /// Get or insert a posting list for the given term key
101    #[inline]
102    fn get_or_insert(&mut self, key: TermKey) -> &mut SpillablePostingList {
103        self.shards[key.shard()]
104            .entry(key)
105            .or_insert_with(SpillablePostingList::new)
106    }
107
108    fn len(&self) -> usize {
109        self.shards.iter().map(|s| s.len()).sum()
110    }
111
112    /// Get total number of in-memory postings across all shards
113    fn total_postings_in_memory(&self) -> usize {
114        self.shards
115            .iter()
116            .flat_map(|s| s.values())
117            .map(|p| p.memory.len())
118            .sum()
119    }
120
121    /// Get shard size distribution (min, max, avg)
122    fn shard_stats(&self) -> (usize, usize, usize) {
123        let sizes: Vec<usize> = self.shards.iter().map(|s| s.len()).collect();
124        let min = *sizes.iter().min().unwrap_or(&0);
125        let max = *sizes.iter().max().unwrap_or(&0);
126        let avg = if sizes.is_empty() {
127            0
128        } else {
129            sizes.iter().sum::<usize>() / sizes.len()
130        };
131        (min, max, avg)
132    }
133}
134
135/// Iterator over all entries in the sharded index
136#[cfg(feature = "native")]
137struct ShardedIndexIter<'a> {
138    shards: std::slice::Iter<'a, HashMap<TermKey, SpillablePostingList>>,
139    current: Option<hashbrown::hash_map::Iter<'a, TermKey, SpillablePostingList>>,
140}
141
142#[cfg(feature = "native")]
143impl<'a> Iterator for ShardedIndexIter<'a> {
144    type Item = (&'a TermKey, &'a SpillablePostingList);
145
146    fn next(&mut self) -> Option<Self::Item> {
147        loop {
148            if let Some(ref mut current) = self.current
149                && let Some(item) = current.next()
150            {
151                return Some(item);
152            }
153            // Move to next shard
154            match self.shards.next() {
155                Some(shard) => self.current = Some(shard.iter()),
156                None => return None,
157            }
158        }
159    }
160}
161
162#[cfg(feature = "native")]
163impl<'a> IntoIterator for &'a ShardedInvertedIndex {
164    type Item = (&'a TermKey, &'a SpillablePostingList);
165    type IntoIter = ShardedIndexIter<'a>;
166
167    fn into_iter(self) -> Self::IntoIter {
168        ShardedIndexIter {
169            shards: self.shards.iter(),
170            current: None,
171        }
172    }
173}
174
175/// Compact posting entry for in-memory storage
176#[cfg(feature = "native")]
177#[derive(Clone, Copy)]
178struct CompactPosting {
179    doc_id: DocId,
180    term_freq: u16, // Most term frequencies fit in u16
181}
182
183/// Posting list that can spill to disk when too large
184#[cfg(feature = "native")]
185struct SpillablePostingList {
186    /// In-memory postings (hot data)
187    memory: Vec<CompactPosting>,
188    /// Offset in spill file where flushed postings start (-1 if none)
189    spill_offset: i64,
190    /// Number of postings in spill file
191    spill_count: u32,
192}
193
194#[cfg(feature = "native")]
195impl SpillablePostingList {
196    fn new() -> Self {
197        Self {
198            memory: Vec::new(),
199            spill_offset: -1,
200            spill_count: 0,
201        }
202    }
203
204    #[allow(dead_code)]
205    fn with_capacity(capacity: usize) -> Self {
206        Self {
207            memory: Vec::with_capacity(capacity),
208            spill_offset: -1,
209            spill_count: 0,
210        }
211    }
212
213    #[inline]
214    fn add(&mut self, doc_id: DocId, term_freq: u32) {
215        // Merge with last posting if same doc_id
216        if let Some(last) = self.memory.last_mut()
217            && last.doc_id == doc_id
218        {
219            last.term_freq = last.term_freq.saturating_add(term_freq as u16);
220            return;
221        }
222        self.memory.push(CompactPosting {
223            doc_id,
224            term_freq: term_freq.min(u16::MAX as u32) as u16,
225        });
226    }
227
228    fn total_count(&self) -> usize {
229        self.memory.len() + self.spill_count as usize
230    }
231
232    fn needs_spill(&self) -> bool {
233        self.memory.len() >= POSTING_FLUSH_THRESHOLD
234    }
235}
236
237/// Statistics for debugging segment builder performance
238#[cfg(feature = "native")]
239#[derive(Debug, Clone)]
240pub struct SegmentBuilderStats {
241    /// Number of documents indexed
242    pub num_docs: u32,
243    /// Number of unique terms in the inverted index
244    pub unique_terms: usize,
245    /// Total postings in memory (across all terms)
246    pub postings_in_memory: usize,
247    /// Number of interned strings
248    pub interned_strings: usize,
249    /// Size of doc_field_lengths vector
250    pub doc_field_lengths_size: usize,
251    /// Shard distribution (min, max, avg terms per shard)
252    pub shard_min: usize,
253    pub shard_max: usize,
254    pub shard_avg: usize,
255    /// Spill file offset (bytes written to disk)
256    pub spill_bytes: u64,
257}
258
259/// Configuration for segment builder
260#[cfg(feature = "native")]
261#[derive(Clone)]
262pub struct SegmentBuilderConfig {
263    /// Directory for temporary spill files
264    pub temp_dir: PathBuf,
265    /// Compression level for document store
266    pub compression_level: CompressionLevel,
267    /// Number of threads for parallel compression
268    pub num_compression_threads: usize,
269    /// Initial capacity for term interner
270    pub interner_capacity: usize,
271    /// Initial capacity for posting lists hashmap
272    pub posting_map_capacity: usize,
273}
274
275#[cfg(feature = "native")]
276impl Default for SegmentBuilderConfig {
277    fn default() -> Self {
278        Self {
279            temp_dir: std::env::temp_dir(),
280            compression_level: CompressionLevel(7),
281            num_compression_threads: num_cpus::get(),
282            interner_capacity: 1_000_000,
283            posting_map_capacity: 500_000,
284        }
285    }
286}
287
288/// Segment builder with optimized memory usage
289///
290/// Features:
291/// - Streams documents to disk immediately (no in-memory document storage)
292/// - Uses string interning for terms (reduced allocations)
293/// - Uses hashbrown HashMap (faster than BTreeMap)
294/// - Spills large posting lists to disk (bounded memory)
295#[cfg(feature = "native")]
296pub struct SegmentBuilder {
297    schema: Schema,
298    config: SegmentBuilderConfig,
299    tokenizers: FxHashMap<Field, BoxedTokenizer>,
300
301    /// String interner for terms - O(1) lookup and deduplication
302    term_interner: Rodeo,
303
304    /// Sharded inverted index for better cache locality
305    /// Each shard is a smaller hashmap that fits better in CPU cache
306    inverted_index: ShardedInvertedIndex,
307
308    /// Streaming document store writer
309    store_file: BufWriter<File>,
310    store_path: PathBuf,
311
312    /// Spill file for large posting lists
313    spill_file: Option<BufWriter<File>>,
314    spill_path: PathBuf,
315    spill_offset: u64,
316
317    /// Document count
318    next_doc_id: DocId,
319
320    /// Per-field statistics for BM25F
321    field_stats: FxHashMap<u32, FieldStats>,
322
323    /// Per-document field lengths stored compactly
324    /// Uses a flat Vec instead of Vec<HashMap> for better cache locality
325    /// Layout: [doc0_field0_len, doc0_field1_len, ..., doc1_field0_len, ...]
326    doc_field_lengths: Vec<u32>,
327    num_indexed_fields: usize,
328    field_to_slot: FxHashMap<u32, usize>,
329
330    /// Optional pre-computed WAND data for IDF values
331    wand_data: Option<Arc<WandData>>,
332
333    /// Reusable buffer for per-document term frequency aggregation
334    /// Avoids allocating a new hashmap for each document
335    local_tf_buffer: FxHashMap<Spur, u32>,
336
337    /// Reusable buffer for terms that need spilling
338    terms_to_spill_buffer: Vec<TermKey>,
339
340    /// Reusable buffer for tokenization to avoid per-token String allocations
341    token_buffer: String,
342}
343
344#[cfg(feature = "native")]
345impl SegmentBuilder {
346    /// Create a new segment builder
347    pub fn new(schema: Schema, config: SegmentBuilderConfig) -> Result<Self> {
348        let segment_id = uuid::Uuid::new_v4();
349        let store_path = config
350            .temp_dir
351            .join(format!("hermes_store_{}.tmp", segment_id));
352        let spill_path = config
353            .temp_dir
354            .join(format!("hermes_spill_{}.tmp", segment_id));
355
356        let store_file = BufWriter::with_capacity(
357            SPILL_BUFFER_SIZE,
358            OpenOptions::new()
359                .create(true)
360                .write(true)
361                .truncate(true)
362                .open(&store_path)?,
363        );
364
365        // Count indexed fields for compact field length storage
366        let mut num_indexed_fields = 0;
367        let mut field_to_slot = FxHashMap::default();
368        for (field, entry) in schema.fields() {
369            if entry.indexed && matches!(entry.field_type, FieldType::Text) {
370                field_to_slot.insert(field.0, num_indexed_fields);
371                num_indexed_fields += 1;
372            }
373        }
374
375        Ok(Self {
376            schema,
377            tokenizers: FxHashMap::default(),
378            term_interner: Rodeo::new(),
379            inverted_index: ShardedInvertedIndex::new(
380                config.posting_map_capacity / NUM_INDEX_SHARDS,
381            ),
382            store_file,
383            store_path,
384            spill_file: None,
385            spill_path,
386            spill_offset: 0,
387            next_doc_id: 0,
388            field_stats: FxHashMap::default(),
389            doc_field_lengths: Vec::new(),
390            num_indexed_fields,
391            field_to_slot,
392            wand_data: None,
393            local_tf_buffer: FxHashMap::default(),
394            terms_to_spill_buffer: Vec::new(),
395            token_buffer: String::with_capacity(64),
396            config,
397        })
398    }
399
400    /// Create with pre-computed WAND data
401    pub fn with_wand_data(
402        schema: Schema,
403        config: SegmentBuilderConfig,
404        wand_data: Arc<WandData>,
405    ) -> Result<Self> {
406        let mut builder = Self::new(schema, config)?;
407        builder.wand_data = Some(wand_data);
408        Ok(builder)
409    }
410
411    pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
412        self.tokenizers.insert(field, tokenizer);
413    }
414
415    pub fn num_docs(&self) -> u32 {
416        self.next_doc_id
417    }
418
419    /// Get current statistics for debugging performance
420    pub fn stats(&self) -> SegmentBuilderStats {
421        let (shard_min, shard_max, shard_avg) = self.inverted_index.shard_stats();
422        SegmentBuilderStats {
423            num_docs: self.next_doc_id,
424            unique_terms: self.inverted_index.len(),
425            postings_in_memory: self.inverted_index.total_postings_in_memory(),
426            interned_strings: self.term_interner.len(),
427            doc_field_lengths_size: self.doc_field_lengths.len(),
428            shard_min,
429            shard_max,
430            shard_avg,
431            spill_bytes: self.spill_offset,
432        }
433    }
434
435    /// Add a document - streams to disk immediately
436    pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
437        let doc_id = self.next_doc_id;
438        self.next_doc_id += 1;
439
440        // Initialize field lengths for this document
441        let base_idx = self.doc_field_lengths.len();
442        self.doc_field_lengths
443            .resize(base_idx + self.num_indexed_fields, 0);
444
445        for (field, value) in doc.field_values() {
446            let entry = self.schema.get_field_entry(*field);
447            if entry.is_none() || !entry.unwrap().indexed {
448                continue;
449            }
450
451            let entry = entry.unwrap();
452            match (&entry.field_type, value) {
453                (FieldType::Text, FieldValue::Text(text)) => {
454                    let token_count = self.index_text_field(*field, doc_id, text)?;
455
456                    // Update field statistics
457                    let stats = self.field_stats.entry(field.0).or_default();
458                    stats.total_tokens += token_count as u64;
459                    stats.doc_count += 1;
460
461                    // Store field length compactly
462                    if let Some(&slot) = self.field_to_slot.get(&field.0) {
463                        self.doc_field_lengths[base_idx + slot] = token_count;
464                    }
465                }
466                (FieldType::U64, FieldValue::U64(v)) => {
467                    self.index_numeric_field(*field, doc_id, *v)?;
468                }
469                (FieldType::I64, FieldValue::I64(v)) => {
470                    self.index_numeric_field(*field, doc_id, *v as u64)?;
471                }
472                (FieldType::F64, FieldValue::F64(v)) => {
473                    self.index_numeric_field(*field, doc_id, v.to_bits())?;
474                }
475                _ => {}
476            }
477        }
478
479        // Stream document to disk immediately
480        self.write_document_to_store(&doc)?;
481
482        Ok(doc_id)
483    }
484
485    /// Index a text field using interned terms
486    ///
487    /// Optimization: Zero-allocation inline tokenization + term frequency aggregation.
488    /// Instead of allocating a String per token, we:
489    /// 1. Iterate over whitespace-split words
490    /// 2. Build lowercase token in a reusable buffer
491    /// 3. Intern directly from the buffer
492    fn index_text_field(&mut self, field: Field, doc_id: DocId, text: &str) -> Result<u32> {
493        // Phase 1: Aggregate term frequencies within this document
494        // Reuse buffer to avoid allocations
495        self.local_tf_buffer.clear();
496
497        let mut token_count = 0u32;
498
499        // Zero-allocation tokenization: iterate words, lowercase inline, intern directly
500        for word in text.split_whitespace() {
501            // Build lowercase token in reusable buffer
502            self.token_buffer.clear();
503            for c in word.chars() {
504                if c.is_alphanumeric() {
505                    for lc in c.to_lowercase() {
506                        self.token_buffer.push(lc);
507                    }
508                }
509            }
510
511            if self.token_buffer.is_empty() {
512                continue;
513            }
514
515            token_count += 1;
516
517            // Intern the term directly from buffer - O(1) amortized
518            let term_spur = self.term_interner.get_or_intern(&self.token_buffer);
519            *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
520        }
521
522        // Phase 2: Insert aggregated terms into inverted index
523        // Now we only do one inverted_index lookup per unique term in doc
524        // Reuse buffer for terms to spill
525        let field_id = field.0;
526        self.terms_to_spill_buffer.clear();
527
528        for (&term_spur, &tf) in &self.local_tf_buffer {
529            let term_key = TermKey {
530                field: field_id,
531                term: term_spur,
532            };
533
534            let posting = self.inverted_index.get_or_insert(term_key);
535            posting.add(doc_id, tf);
536
537            // Mark for spilling if needed
538            if posting.needs_spill() {
539                self.terms_to_spill_buffer.push(term_key);
540            }
541        }
542
543        // Phase 3: Spill large posting lists
544        for i in 0..self.terms_to_spill_buffer.len() {
545            let term_key = self.terms_to_spill_buffer[i];
546            self.spill_posting_list(term_key)?;
547        }
548
549        Ok(token_count)
550    }
551
552    fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
553        // For numeric fields, we use a special encoding
554        let term_str = format!("__num_{}", value);
555        let term_spur = self.term_interner.get_or_intern(&term_str);
556
557        let term_key = TermKey {
558            field: field.0,
559            term: term_spur,
560        };
561
562        let posting = self.inverted_index.get_or_insert(term_key);
563        posting.add(doc_id, 1);
564
565        Ok(())
566    }
567
568    /// Write document to streaming store
569    fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
570        use byteorder::{LittleEndian, WriteBytesExt};
571
572        let doc_bytes = super::store::serialize_document(doc, &self.schema)?;
573
574        self.store_file
575            .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
576        self.store_file.write_all(&doc_bytes)?;
577
578        Ok(())
579    }
580
581    /// Spill a large posting list to disk
582    fn spill_posting_list(&mut self, term_key: TermKey) -> Result<()> {
583        use byteorder::{LittleEndian, WriteBytesExt};
584
585        let posting = self.inverted_index.get_mut(&term_key).unwrap();
586
587        // Initialize spill file if needed
588        if self.spill_file.is_none() {
589            let file = OpenOptions::new()
590                .create(true)
591                .write(true)
592                .read(true)
593                .truncate(true)
594                .open(&self.spill_path)?;
595            self.spill_file = Some(BufWriter::with_capacity(SPILL_BUFFER_SIZE, file));
596        }
597
598        let spill_file = self.spill_file.as_mut().unwrap();
599
600        // Record spill offset if this is first spill for this term
601        if posting.spill_offset < 0 {
602            posting.spill_offset = self.spill_offset as i64;
603        }
604
605        // Write postings to spill file
606        for p in &posting.memory {
607            spill_file.write_u32::<LittleEndian>(p.doc_id)?;
608            spill_file.write_u16::<LittleEndian>(p.term_freq)?;
609            self.spill_offset += 6; // 4 bytes doc_id + 2 bytes term_freq
610        }
611
612        posting.spill_count += posting.memory.len() as u32;
613        posting.memory.clear();
614        posting.memory.shrink_to(POSTING_FLUSH_THRESHOLD / 4); // Keep some capacity
615
616        Ok(())
617    }
618
619    /// Build the final segment
620    pub async fn build<D: Directory + DirectoryWriter>(
621        mut self,
622        dir: &D,
623        segment_id: SegmentId,
624    ) -> Result<SegmentMeta> {
625        // Flush any buffered data
626        self.store_file.flush()?;
627        if let Some(ref mut spill) = self.spill_file {
628            spill.flush()?;
629        }
630
631        let files = SegmentFiles::new(segment_id.0);
632
633        // Build term dictionary and postings
634        let (term_dict_data, postings_data) = self.build_postings()?;
635
636        // Build document store from streamed data
637        let store_data = self.build_store_from_stream()?;
638
639        // Write to directory
640        dir.write(&files.term_dict, &term_dict_data).await?;
641        dir.write(&files.postings, &postings_data).await?;
642        dir.write(&files.store, &store_data).await?;
643
644        let meta = SegmentMeta {
645            id: segment_id.0,
646            num_docs: self.next_doc_id,
647            field_stats: self.field_stats.clone(),
648        };
649
650        dir.write(&files.meta, &meta.serialize()?).await?;
651
652        // Cleanup temp files
653        let _ = std::fs::remove_file(&self.store_path);
654        let _ = std::fs::remove_file(&self.spill_path);
655
656        Ok(meta)
657    }
658
659    /// Build postings from inverted index
660    fn build_postings(&mut self) -> Result<(Vec<u8>, Vec<u8>)> {
661        use std::collections::BTreeMap;
662
663        // We need to sort terms for SSTable, so collect into BTreeMap
664        // Key format: field_id (4 bytes) + term bytes
665        let mut sorted_terms: BTreeMap<Vec<u8>, &SpillablePostingList> = BTreeMap::new();
666
667        for (term_key, posting_list) in &self.inverted_index {
668            let term_str = self.term_interner.resolve(&term_key.term);
669            let mut key = Vec::with_capacity(4 + term_str.len());
670            key.extend_from_slice(&term_key.field.to_le_bytes());
671            key.extend_from_slice(term_str.as_bytes());
672            sorted_terms.insert(key, posting_list);
673        }
674
675        let mut term_dict = Vec::new();
676        let mut postings = Vec::new();
677        let mut writer = SSTableWriter::<TermInfo>::new(&mut term_dict);
678
679        // Memory-map spill file if it exists
680        let spill_mmap = if self.spill_file.is_some() {
681            drop(self.spill_file.take()); // Close writer
682            let file = File::open(&self.spill_path)?;
683            Some(unsafe { memmap2::Mmap::map(&file)? })
684        } else {
685            None
686        };
687
688        for (key, spill_posting) in sorted_terms {
689            // Reconstruct full posting list
690            let mut full_postings = PostingList::with_capacity(spill_posting.total_count());
691
692            // Read spilled postings first (they come before in-memory ones)
693            if spill_posting.spill_offset >= 0
694                && let Some(ref mmap) = spill_mmap
695            {
696                let mut offset = spill_posting.spill_offset as usize;
697                for _ in 0..spill_posting.spill_count {
698                    let doc_id = u32::from_le_bytes([
699                        mmap[offset],
700                        mmap[offset + 1],
701                        mmap[offset + 2],
702                        mmap[offset + 3],
703                    ]);
704                    let term_freq = u16::from_le_bytes([mmap[offset + 4], mmap[offset + 5]]);
705                    full_postings.push(doc_id, term_freq as u32);
706                    offset += 6;
707                }
708            }
709
710            // Add in-memory postings
711            for p in &spill_posting.memory {
712                full_postings.push(p.doc_id, p.term_freq as u32);
713            }
714
715            // Build term info
716            let doc_ids: Vec<u32> = full_postings.iter().map(|p| p.doc_id).collect();
717            let term_freqs: Vec<u32> = full_postings.iter().map(|p| p.term_freq).collect();
718
719            let term_info = if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
720                inline
721            } else {
722                let posting_offset = postings.len() as u64;
723                let block_list =
724                    crate::structures::BlockPostingList::from_posting_list(&full_postings)?;
725                block_list.serialize(&mut postings)?;
726                TermInfo::external(
727                    posting_offset,
728                    (postings.len() as u64 - posting_offset) as u32,
729                    full_postings.doc_count(),
730                )
731            };
732
733            writer.insert(&key, &term_info)?;
734        }
735
736        writer.finish()?;
737        Ok((term_dict, postings))
738    }
739
740    /// Build document store from streamed temp file
741    fn build_store_from_stream(&mut self) -> Result<Vec<u8>> {
742        // Memory-map the temp store file
743        drop(std::mem::replace(
744            &mut self.store_file,
745            BufWriter::new(File::create("/dev/null")?),
746        ));
747
748        let file = File::open(&self.store_path)?;
749        let mmap = unsafe { memmap2::Mmap::map(&file)? };
750
751        // Re-compress with proper block structure
752        let mut store_data = Vec::new();
753        let mut store_writer =
754            StoreWriter::with_compression_level(&mut store_data, self.config.compression_level);
755
756        let mut offset = 0usize;
757        while offset < mmap.len() {
758            if offset + 4 > mmap.len() {
759                break;
760            }
761
762            let doc_len = u32::from_le_bytes([
763                mmap[offset],
764                mmap[offset + 1],
765                mmap[offset + 2],
766                mmap[offset + 3],
767            ]) as usize;
768            offset += 4;
769
770            if offset + doc_len > mmap.len() {
771                break;
772            }
773
774            let doc_bytes = &mmap[offset..offset + doc_len];
775            offset += doc_len;
776
777            // Deserialize and re-store with proper compression
778            if let Ok(doc) = super::store::deserialize_document(doc_bytes, &self.schema) {
779                store_writer.store(&doc, &self.schema)?;
780            }
781        }
782
783        store_writer.finish()?;
784        Ok(store_data)
785    }
786}
787
788#[cfg(feature = "native")]
789impl Drop for SegmentBuilder {
790    fn drop(&mut self) {
791        // Cleanup temp files on drop
792        let _ = std::fs::remove_file(&self.store_path);
793        let _ = std::fs::remove_file(&self.spill_path);
794    }
795}