hermes_core/segment/
builder.rs

1//! Streaming segment builder with optimized memory usage
2//!
3//! Key optimizations:
4//! - **String interning**: Terms are interned using `lasso` to avoid repeated allocations
5//! - **hashbrown HashMap**: O(1) average insertion instead of BTreeMap's O(log n)
6//! - **Streaming document store**: Documents written to disk immediately
7//! - **Incremental posting flush**: Large posting lists flushed to temp file
8//! - **Memory-mapped intermediate files**: Reduces memory pressure
9//! - **Arena allocation**: Batch allocations for reduced fragmentation
10
11#[cfg(feature = "native")]
12use std::fs::{File, OpenOptions};
13#[cfg(feature = "native")]
14use std::io::{BufWriter, Write};
15#[cfg(feature = "native")]
16use std::path::PathBuf;
17#[cfg(feature = "native")]
18use std::sync::Arc;
19
20#[cfg(feature = "native")]
21use hashbrown::HashMap;
22#[cfg(feature = "native")]
23use lasso::{Rodeo, Spur};
24#[cfg(feature = "native")]
25use rustc_hash::FxHashMap;
26
27#[cfg(feature = "native")]
28use super::store::StoreWriter;
29#[cfg(feature = "native")]
30use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
31#[cfg(feature = "native")]
32use crate::compression::CompressionLevel;
33#[cfg(feature = "native")]
34use crate::directories::{Directory, DirectoryWriter};
35#[cfg(feature = "native")]
36use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
37#[cfg(feature = "native")]
38use crate::structures::{PostingList, SSTableWriter, TermInfo};
39#[cfg(feature = "native")]
40use crate::tokenizer::{BoxedTokenizer, LowercaseTokenizer};
41#[cfg(feature = "native")]
42use crate::wand::WandData;
43#[cfg(feature = "native")]
44use crate::{DocId, Result};
45
46/// Threshold for flushing a posting list to disk (number of postings)
47/// Lower threshold = more aggressive spilling = less memory pressure
48#[cfg(feature = "native")]
49const POSTING_FLUSH_THRESHOLD: usize = 1_000;
50
51/// Size of the posting spill buffer before writing to disk
52#[cfg(feature = "native")]
53const SPILL_BUFFER_SIZE: usize = 16 * 1024 * 1024; // 16MB
54
55/// Number of shards for the inverted index
56/// Power of 2 for fast modulo via bitwise AND
57#[cfg(feature = "native")]
58const NUM_INDEX_SHARDS: usize = 64;
59
60/// Interned term key combining field and term
61#[cfg(feature = "native")]
62#[derive(Clone, Copy, PartialEq, Eq, Hash)]
63struct TermKey {
64    field: u32,
65    term: Spur,
66}
67
68#[cfg(feature = "native")]
69impl TermKey {
70    /// Get shard index for this term key (uses term's internal id for distribution)
71    #[inline]
72    fn shard(&self) -> usize {
73        // Spur wraps NonZero<u32>, get the raw value for sharding
74        // Using bitwise AND since NUM_INDEX_SHARDS is power of 2
75        (self.term.into_inner().get() as usize) & (NUM_INDEX_SHARDS - 1)
76    }
77}
78
79/// Sharded inverted index for better cache locality
80/// Each shard is a smaller hashmap that fits better in CPU cache
81#[cfg(feature = "native")]
82struct ShardedInvertedIndex {
83    shards: Vec<HashMap<TermKey, SpillablePostingList>>,
84}
85
86#[cfg(feature = "native")]
87impl ShardedInvertedIndex {
88    fn new(capacity_per_shard: usize) -> Self {
89        let mut shards = Vec::with_capacity(NUM_INDEX_SHARDS);
90        for _ in 0..NUM_INDEX_SHARDS {
91            shards.push(HashMap::with_capacity(capacity_per_shard));
92        }
93        Self { shards }
94    }
95
96    #[inline]
97    fn get_mut(&mut self, key: &TermKey) -> Option<&mut SpillablePostingList> {
98        self.shards[key.shard()].get_mut(key)
99    }
100
101    /// Get or insert a posting list for the given term key
102    #[inline]
103    fn get_or_insert(&mut self, key: TermKey) -> &mut SpillablePostingList {
104        self.shards[key.shard()]
105            .entry(key)
106            .or_insert_with(SpillablePostingList::new)
107    }
108
109    fn len(&self) -> usize {
110        self.shards.iter().map(|s| s.len()).sum()
111    }
112
113    /// Get total number of in-memory postings across all shards
114    fn total_postings_in_memory(&self) -> usize {
115        self.shards
116            .iter()
117            .flat_map(|s| s.values())
118            .map(|p| p.memory.len())
119            .sum()
120    }
121
122    /// Get shard size distribution (min, max, avg)
123    fn shard_stats(&self) -> (usize, usize, usize) {
124        let sizes: Vec<usize> = self.shards.iter().map(|s| s.len()).collect();
125        let min = *sizes.iter().min().unwrap_or(&0);
126        let max = *sizes.iter().max().unwrap_or(&0);
127        let avg = if sizes.is_empty() {
128            0
129        } else {
130            sizes.iter().sum::<usize>() / sizes.len()
131        };
132        (min, max, avg)
133    }
134}
135
136/// Iterator over all entries in the sharded index
137#[cfg(feature = "native")]
138struct ShardedIndexIter<'a> {
139    shards: std::slice::Iter<'a, HashMap<TermKey, SpillablePostingList>>,
140    current: Option<hashbrown::hash_map::Iter<'a, TermKey, SpillablePostingList>>,
141}
142
143#[cfg(feature = "native")]
144impl<'a> Iterator for ShardedIndexIter<'a> {
145    type Item = (&'a TermKey, &'a SpillablePostingList);
146
147    fn next(&mut self) -> Option<Self::Item> {
148        loop {
149            if let Some(ref mut current) = self.current
150                && let Some(item) = current.next()
151            {
152                return Some(item);
153            }
154            // Move to next shard
155            match self.shards.next() {
156                Some(shard) => self.current = Some(shard.iter()),
157                None => return None,
158            }
159        }
160    }
161}
162
163#[cfg(feature = "native")]
164impl<'a> IntoIterator for &'a ShardedInvertedIndex {
165    type Item = (&'a TermKey, &'a SpillablePostingList);
166    type IntoIter = ShardedIndexIter<'a>;
167
168    fn into_iter(self) -> Self::IntoIter {
169        ShardedIndexIter {
170            shards: self.shards.iter(),
171            current: None,
172        }
173    }
174}
175
176/// Compact posting entry for in-memory storage
177#[cfg(feature = "native")]
178#[derive(Clone, Copy)]
179struct CompactPosting {
180    doc_id: DocId,
181    term_freq: u16, // Most term frequencies fit in u16
182}
183
184/// Posting list that can spill to disk when too large
185#[cfg(feature = "native")]
186struct SpillablePostingList {
187    /// In-memory postings (hot data)
188    memory: Vec<CompactPosting>,
189    /// Offset in spill file where flushed postings start (-1 if none)
190    spill_offset: i64,
191    /// Number of postings in spill file
192    spill_count: u32,
193}
194
195#[cfg(feature = "native")]
196impl SpillablePostingList {
197    fn new() -> Self {
198        Self {
199            memory: Vec::new(),
200            spill_offset: -1,
201            spill_count: 0,
202        }
203    }
204
205    #[allow(dead_code)]
206    fn with_capacity(capacity: usize) -> Self {
207        Self {
208            memory: Vec::with_capacity(capacity),
209            spill_offset: -1,
210            spill_count: 0,
211        }
212    }
213
214    #[inline]
215    fn add(&mut self, doc_id: DocId, term_freq: u32) {
216        // Merge with last posting if same doc_id
217        if let Some(last) = self.memory.last_mut()
218            && last.doc_id == doc_id
219        {
220            last.term_freq = last.term_freq.saturating_add(term_freq as u16);
221            return;
222        }
223        self.memory.push(CompactPosting {
224            doc_id,
225            term_freq: term_freq.min(u16::MAX as u32) as u16,
226        });
227    }
228
229    fn total_count(&self) -> usize {
230        self.memory.len() + self.spill_count as usize
231    }
232
233    fn needs_spill(&self) -> bool {
234        self.memory.len() >= POSTING_FLUSH_THRESHOLD
235    }
236}
237
238/// Statistics for debugging segment builder performance
239#[cfg(feature = "native")]
240#[derive(Debug, Clone)]
241pub struct SegmentBuilderStats {
242    /// Number of documents indexed
243    pub num_docs: u32,
244    /// Number of unique terms in the inverted index
245    pub unique_terms: usize,
246    /// Total postings in memory (across all terms)
247    pub postings_in_memory: usize,
248    /// Number of interned strings
249    pub interned_strings: usize,
250    /// Size of doc_field_lengths vector
251    pub doc_field_lengths_size: usize,
252    /// Shard distribution (min, max, avg terms per shard)
253    pub shard_min: usize,
254    pub shard_max: usize,
255    pub shard_avg: usize,
256    /// Spill file offset (bytes written to disk)
257    pub spill_bytes: u64,
258}
259
260/// Configuration for segment builder
261#[cfg(feature = "native")]
262#[derive(Clone)]
263pub struct SegmentBuilderConfig {
264    /// Directory for temporary spill files
265    pub temp_dir: PathBuf,
266    /// Compression level for document store
267    pub compression_level: CompressionLevel,
268    /// Number of threads for parallel compression
269    pub num_compression_threads: usize,
270    /// Initial capacity for term interner
271    pub interner_capacity: usize,
272    /// Initial capacity for posting lists hashmap
273    pub posting_map_capacity: usize,
274}
275
276#[cfg(feature = "native")]
277impl Default for SegmentBuilderConfig {
278    fn default() -> Self {
279        Self {
280            temp_dir: std::env::temp_dir(),
281            compression_level: CompressionLevel(7),
282            num_compression_threads: num_cpus::get(),
283            interner_capacity: 1_000_000,
284            posting_map_capacity: 500_000,
285        }
286    }
287}
288
289/// Segment builder with optimized memory usage
290///
291/// Features:
292/// - Streams documents to disk immediately (no in-memory document storage)
293/// - Uses string interning for terms (reduced allocations)
294/// - Uses hashbrown HashMap (faster than BTreeMap)
295/// - Spills large posting lists to disk (bounded memory)
296#[cfg(feature = "native")]
297pub struct SegmentBuilder {
298    schema: Schema,
299    config: SegmentBuilderConfig,
300    tokenizers: FxHashMap<Field, BoxedTokenizer>,
301
302    /// String interner for terms - O(1) lookup and deduplication
303    term_interner: Rodeo,
304
305    /// Sharded inverted index for better cache locality
306    /// Each shard is a smaller hashmap that fits better in CPU cache
307    inverted_index: ShardedInvertedIndex,
308
309    /// Streaming document store writer
310    store_file: BufWriter<File>,
311    store_path: PathBuf,
312
313    /// Spill file for large posting lists
314    spill_file: Option<BufWriter<File>>,
315    spill_path: PathBuf,
316    spill_offset: u64,
317
318    /// Document count
319    next_doc_id: DocId,
320
321    /// Per-field statistics for BM25F
322    field_stats: FxHashMap<u32, FieldStats>,
323
324    /// Per-document field lengths stored compactly
325    /// Uses a flat Vec instead of Vec<HashMap> for better cache locality
326    /// Layout: [doc0_field0_len, doc0_field1_len, ..., doc1_field0_len, ...]
327    doc_field_lengths: Vec<u32>,
328    num_indexed_fields: usize,
329    field_to_slot: FxHashMap<u32, usize>,
330
331    /// Optional pre-computed WAND data for IDF values
332    wand_data: Option<Arc<WandData>>,
333
334    /// Reusable buffer for per-document term frequency aggregation
335    /// Avoids allocating a new hashmap for each document
336    local_tf_buffer: FxHashMap<Spur, u32>,
337
338    /// Reusable buffer for terms that need spilling
339    terms_to_spill_buffer: Vec<TermKey>,
340}
341
342#[cfg(feature = "native")]
343impl SegmentBuilder {
344    /// Create a new segment builder
345    pub fn new(schema: Schema, config: SegmentBuilderConfig) -> Result<Self> {
346        let segment_id = uuid::Uuid::new_v4();
347        let store_path = config
348            .temp_dir
349            .join(format!("hermes_store_{}.tmp", segment_id));
350        let spill_path = config
351            .temp_dir
352            .join(format!("hermes_spill_{}.tmp", segment_id));
353
354        let store_file = BufWriter::with_capacity(
355            SPILL_BUFFER_SIZE,
356            OpenOptions::new()
357                .create(true)
358                .write(true)
359                .truncate(true)
360                .open(&store_path)?,
361        );
362
363        // Count indexed fields for compact field length storage
364        let mut num_indexed_fields = 0;
365        let mut field_to_slot = FxHashMap::default();
366        for (field, entry) in schema.fields() {
367            if entry.indexed && matches!(entry.field_type, FieldType::Text) {
368                field_to_slot.insert(field.0, num_indexed_fields);
369                num_indexed_fields += 1;
370            }
371        }
372
373        Ok(Self {
374            schema,
375            tokenizers: FxHashMap::default(),
376            term_interner: Rodeo::new(),
377            inverted_index: ShardedInvertedIndex::new(
378                config.posting_map_capacity / NUM_INDEX_SHARDS,
379            ),
380            store_file,
381            store_path,
382            spill_file: None,
383            spill_path,
384            spill_offset: 0,
385            next_doc_id: 0,
386            field_stats: FxHashMap::default(),
387            doc_field_lengths: Vec::new(),
388            num_indexed_fields,
389            field_to_slot,
390            wand_data: None,
391            local_tf_buffer: FxHashMap::default(),
392            terms_to_spill_buffer: Vec::new(),
393            config,
394        })
395    }
396
397    /// Create with pre-computed WAND data
398    pub fn with_wand_data(
399        schema: Schema,
400        config: SegmentBuilderConfig,
401        wand_data: Arc<WandData>,
402    ) -> Result<Self> {
403        let mut builder = Self::new(schema, config)?;
404        builder.wand_data = Some(wand_data);
405        Ok(builder)
406    }
407
408    pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
409        self.tokenizers.insert(field, tokenizer);
410    }
411
412    pub fn num_docs(&self) -> u32 {
413        self.next_doc_id
414    }
415
416    /// Get current statistics for debugging performance
417    pub fn stats(&self) -> SegmentBuilderStats {
418        let (shard_min, shard_max, shard_avg) = self.inverted_index.shard_stats();
419        SegmentBuilderStats {
420            num_docs: self.next_doc_id,
421            unique_terms: self.inverted_index.len(),
422            postings_in_memory: self.inverted_index.total_postings_in_memory(),
423            interned_strings: self.term_interner.len(),
424            doc_field_lengths_size: self.doc_field_lengths.len(),
425            shard_min,
426            shard_max,
427            shard_avg,
428            spill_bytes: self.spill_offset,
429        }
430    }
431
432    /// Add a document - streams to disk immediately
433    pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
434        let doc_id = self.next_doc_id;
435        self.next_doc_id += 1;
436
437        // Initialize field lengths for this document
438        let base_idx = self.doc_field_lengths.len();
439        self.doc_field_lengths
440            .resize(base_idx + self.num_indexed_fields, 0);
441
442        for (field, value) in doc.field_values() {
443            let entry = self.schema.get_field_entry(*field);
444            if entry.is_none() || !entry.unwrap().indexed {
445                continue;
446            }
447
448            let entry = entry.unwrap();
449            match (&entry.field_type, value) {
450                (FieldType::Text, FieldValue::Text(text)) => {
451                    let token_count = self.index_text_field(*field, doc_id, text)?;
452
453                    // Update field statistics
454                    let stats = self.field_stats.entry(field.0).or_default();
455                    stats.total_tokens += token_count as u64;
456                    stats.doc_count += 1;
457
458                    // Store field length compactly
459                    if let Some(&slot) = self.field_to_slot.get(&field.0) {
460                        self.doc_field_lengths[base_idx + slot] = token_count;
461                    }
462                }
463                (FieldType::U64, FieldValue::U64(v)) => {
464                    self.index_numeric_field(*field, doc_id, *v)?;
465                }
466                (FieldType::I64, FieldValue::I64(v)) => {
467                    self.index_numeric_field(*field, doc_id, *v as u64)?;
468                }
469                (FieldType::F64, FieldValue::F64(v)) => {
470                    self.index_numeric_field(*field, doc_id, v.to_bits())?;
471                }
472                _ => {}
473            }
474        }
475
476        // Stream document to disk immediately
477        self.write_document_to_store(&doc)?;
478
479        Ok(doc_id)
480    }
481
482    /// Index a text field using interned terms
483    ///
484    /// Optimization: aggregate term frequencies within the document first,
485    /// then do a single insert per unique term. This reduces hash lookups
486    /// from O(total_tokens) to O(unique_terms_in_doc).
487    fn index_text_field(&mut self, field: Field, doc_id: DocId, text: &str) -> Result<u32> {
488        let default_tokenizer = LowercaseTokenizer;
489        let tokenizer: &dyn crate::tokenizer::TokenizerClone = self
490            .tokenizers
491            .get(&field)
492            .map(|t| t.as_ref())
493            .unwrap_or(&default_tokenizer);
494
495        let tokens = tokenizer.tokenize(text);
496        let token_count = tokens.len() as u32;
497
498        // Phase 1: Aggregate term frequencies within this document
499        // Reuse buffer to avoid allocations
500        self.local_tf_buffer.clear();
501
502        for token in tokens {
503            // Intern the term - O(1) amortized
504            let term_spur = self.term_interner.get_or_intern(&token.text);
505            *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
506        }
507
508        // Phase 2: Insert aggregated terms into inverted index
509        // Now we only do one inverted_index lookup per unique term in doc
510        // Reuse buffer for terms to spill
511        let field_id = field.0;
512        self.terms_to_spill_buffer.clear();
513
514        for (&term_spur, &tf) in &self.local_tf_buffer {
515            let term_key = TermKey {
516                field: field_id,
517                term: term_spur,
518            };
519
520            let posting = self.inverted_index.get_or_insert(term_key);
521            posting.add(doc_id, tf);
522
523            // Mark for spilling if needed
524            if posting.needs_spill() {
525                self.terms_to_spill_buffer.push(term_key);
526            }
527        }
528
529        // Phase 3: Spill large posting lists
530        for i in 0..self.terms_to_spill_buffer.len() {
531            let term_key = self.terms_to_spill_buffer[i];
532            self.spill_posting_list(term_key)?;
533        }
534
535        Ok(token_count)
536    }
537
538    fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
539        // For numeric fields, we use a special encoding
540        let term_str = format!("__num_{}", value);
541        let term_spur = self.term_interner.get_or_intern(&term_str);
542
543        let term_key = TermKey {
544            field: field.0,
545            term: term_spur,
546        };
547
548        let posting = self.inverted_index.get_or_insert(term_key);
549        posting.add(doc_id, 1);
550
551        Ok(())
552    }
553
554    /// Write document to streaming store
555    fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
556        use byteorder::{LittleEndian, WriteBytesExt};
557
558        let doc_bytes = super::store::serialize_document(doc, &self.schema)?;
559
560        self.store_file
561            .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
562        self.store_file.write_all(&doc_bytes)?;
563
564        Ok(())
565    }
566
567    /// Spill a large posting list to disk
568    fn spill_posting_list(&mut self, term_key: TermKey) -> Result<()> {
569        use byteorder::{LittleEndian, WriteBytesExt};
570
571        let posting = self.inverted_index.get_mut(&term_key).unwrap();
572
573        // Initialize spill file if needed
574        if self.spill_file.is_none() {
575            let file = OpenOptions::new()
576                .create(true)
577                .write(true)
578                .read(true)
579                .truncate(true)
580                .open(&self.spill_path)?;
581            self.spill_file = Some(BufWriter::with_capacity(SPILL_BUFFER_SIZE, file));
582        }
583
584        let spill_file = self.spill_file.as_mut().unwrap();
585
586        // Record spill offset if this is first spill for this term
587        if posting.spill_offset < 0 {
588            posting.spill_offset = self.spill_offset as i64;
589        }
590
591        // Write postings to spill file
592        for p in &posting.memory {
593            spill_file.write_u32::<LittleEndian>(p.doc_id)?;
594            spill_file.write_u16::<LittleEndian>(p.term_freq)?;
595            self.spill_offset += 6; // 4 bytes doc_id + 2 bytes term_freq
596        }
597
598        posting.spill_count += posting.memory.len() as u32;
599        posting.memory.clear();
600        posting.memory.shrink_to(POSTING_FLUSH_THRESHOLD / 4); // Keep some capacity
601
602        Ok(())
603    }
604
605    /// Build the final segment
606    pub async fn build<D: Directory + DirectoryWriter>(
607        mut self,
608        dir: &D,
609        segment_id: SegmentId,
610    ) -> Result<SegmentMeta> {
611        // Flush any buffered data
612        self.store_file.flush()?;
613        if let Some(ref mut spill) = self.spill_file {
614            spill.flush()?;
615        }
616
617        let files = SegmentFiles::new(segment_id.0);
618
619        // Build term dictionary and postings
620        let (term_dict_data, postings_data) = self.build_postings()?;
621
622        // Build document store from streamed data
623        let store_data = self.build_store_from_stream()?;
624
625        // Write to directory
626        dir.write(&files.term_dict, &term_dict_data).await?;
627        dir.write(&files.postings, &postings_data).await?;
628        dir.write(&files.store, &store_data).await?;
629
630        let meta = SegmentMeta {
631            id: segment_id.0,
632            num_docs: self.next_doc_id,
633            field_stats: self.field_stats.clone(),
634        };
635
636        dir.write(&files.meta, &meta.serialize()?).await?;
637
638        // Cleanup temp files
639        let _ = std::fs::remove_file(&self.store_path);
640        let _ = std::fs::remove_file(&self.spill_path);
641
642        Ok(meta)
643    }
644
645    /// Build postings from inverted index
646    fn build_postings(&mut self) -> Result<(Vec<u8>, Vec<u8>)> {
647        use std::collections::BTreeMap;
648
649        // We need to sort terms for SSTable, so collect into BTreeMap
650        // Key format: field_id (4 bytes) + term bytes
651        let mut sorted_terms: BTreeMap<Vec<u8>, &SpillablePostingList> = BTreeMap::new();
652
653        for (term_key, posting_list) in &self.inverted_index {
654            let term_str = self.term_interner.resolve(&term_key.term);
655            let mut key = Vec::with_capacity(4 + term_str.len());
656            key.extend_from_slice(&term_key.field.to_le_bytes());
657            key.extend_from_slice(term_str.as_bytes());
658            sorted_terms.insert(key, posting_list);
659        }
660
661        let mut term_dict = Vec::new();
662        let mut postings = Vec::new();
663        let mut writer = SSTableWriter::<TermInfo>::new(&mut term_dict);
664
665        // Memory-map spill file if it exists
666        let spill_mmap = if self.spill_file.is_some() {
667            drop(self.spill_file.take()); // Close writer
668            let file = File::open(&self.spill_path)?;
669            Some(unsafe { memmap2::Mmap::map(&file)? })
670        } else {
671            None
672        };
673
674        for (key, spill_posting) in sorted_terms {
675            // Reconstruct full posting list
676            let mut full_postings = PostingList::with_capacity(spill_posting.total_count());
677
678            // Read spilled postings first (they come before in-memory ones)
679            if spill_posting.spill_offset >= 0
680                && let Some(ref mmap) = spill_mmap
681            {
682                let mut offset = spill_posting.spill_offset as usize;
683                for _ in 0..spill_posting.spill_count {
684                    let doc_id = u32::from_le_bytes([
685                        mmap[offset],
686                        mmap[offset + 1],
687                        mmap[offset + 2],
688                        mmap[offset + 3],
689                    ]);
690                    let term_freq = u16::from_le_bytes([mmap[offset + 4], mmap[offset + 5]]);
691                    full_postings.push(doc_id, term_freq as u32);
692                    offset += 6;
693                }
694            }
695
696            // Add in-memory postings
697            for p in &spill_posting.memory {
698                full_postings.push(p.doc_id, p.term_freq as u32);
699            }
700
701            // Build term info
702            let doc_ids: Vec<u32> = full_postings.iter().map(|p| p.doc_id).collect();
703            let term_freqs: Vec<u32> = full_postings.iter().map(|p| p.term_freq).collect();
704
705            let term_info = if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
706                inline
707            } else {
708                let posting_offset = postings.len() as u64;
709                let block_list =
710                    crate::structures::BlockPostingList::from_posting_list(&full_postings)?;
711                block_list.serialize(&mut postings)?;
712                TermInfo::external(
713                    posting_offset,
714                    (postings.len() as u64 - posting_offset) as u32,
715                    full_postings.doc_count(),
716                )
717            };
718
719            writer.insert(&key, &term_info)?;
720        }
721
722        writer.finish()?;
723        Ok((term_dict, postings))
724    }
725
726    /// Build document store from streamed temp file
727    fn build_store_from_stream(&mut self) -> Result<Vec<u8>> {
728        // Memory-map the temp store file
729        drop(std::mem::replace(
730            &mut self.store_file,
731            BufWriter::new(File::create("/dev/null")?),
732        ));
733
734        let file = File::open(&self.store_path)?;
735        let mmap = unsafe { memmap2::Mmap::map(&file)? };
736
737        // Re-compress with proper block structure
738        let mut store_data = Vec::new();
739        let mut store_writer =
740            StoreWriter::with_compression_level(&mut store_data, self.config.compression_level);
741
742        let mut offset = 0usize;
743        while offset < mmap.len() {
744            if offset + 4 > mmap.len() {
745                break;
746            }
747
748            let doc_len = u32::from_le_bytes([
749                mmap[offset],
750                mmap[offset + 1],
751                mmap[offset + 2],
752                mmap[offset + 3],
753            ]) as usize;
754            offset += 4;
755
756            if offset + doc_len > mmap.len() {
757                break;
758            }
759
760            let doc_bytes = &mmap[offset..offset + doc_len];
761            offset += doc_len;
762
763            // Deserialize and re-store with proper compression
764            if let Ok(doc) = super::store::deserialize_document(doc_bytes, &self.schema) {
765                store_writer.store(&doc, &self.schema)?;
766            }
767        }
768
769        store_writer.finish()?;
770        Ok(store_data)
771    }
772}
773
774#[cfg(feature = "native")]
775impl Drop for SegmentBuilder {
776    fn drop(&mut self) {
777        // Cleanup temp files on drop
778        let _ = std::fs::remove_file(&self.store_path);
779        let _ = std::fs::remove_file(&self.spill_path);
780    }
781}