hermes_core/segment/
builder.rs

1//! Streaming segment builder with optimized memory usage
2//!
3//! Key optimizations:
4//! - **String interning**: Terms are interned using `lasso` to avoid repeated allocations
5//! - **hashbrown HashMap**: O(1) average insertion instead of BTreeMap's O(log n)
6//! - **Streaming document store**: Documents written to disk immediately
7//! - **Incremental posting flush**: Large posting lists flushed to temp file
8//! - **Memory-mapped intermediate files**: Reduces memory pressure
9//! - **Arena allocation**: Batch allocations for reduced fragmentation
10
11#[cfg(feature = "native")]
12use std::fs::{File, OpenOptions};
13#[cfg(feature = "native")]
14use std::io::{BufWriter, Write};
15#[cfg(feature = "native")]
16use std::path::PathBuf;
17#[cfg(feature = "native")]
18use std::sync::Arc;
19
20#[cfg(feature = "native")]
21use hashbrown::HashMap;
22#[cfg(feature = "native")]
23use lasso::{Rodeo, Spur};
24#[cfg(feature = "native")]
25use rustc_hash::FxHashMap;
26
27#[cfg(feature = "native")]
28use super::store::StoreWriter;
29#[cfg(feature = "native")]
30use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
31#[cfg(feature = "native")]
32use crate::compression::CompressionLevel;
33#[cfg(feature = "native")]
34use crate::directories::{Directory, DirectoryWriter};
35#[cfg(feature = "native")]
36use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
37#[cfg(feature = "native")]
38use crate::structures::{PostingList, SSTableWriter, TermInfo};
39#[cfg(feature = "native")]
40use crate::tokenizer::BoxedTokenizer;
41#[cfg(feature = "native")]
42use crate::wand::WandData;
43#[cfg(feature = "native")]
44use crate::{DocId, Result};
45
46/// Threshold for flushing a posting list to disk (number of postings)
47#[cfg(feature = "native")]
48const POSTING_FLUSH_THRESHOLD: usize = 10_000;
49
50/// Size of the posting spill buffer before writing to disk
51#[cfg(feature = "native")]
52const SPILL_BUFFER_SIZE: usize = 16 * 1024 * 1024; // 16MB
53
54/// Number of shards for the inverted index
55/// Power of 2 for fast modulo via bitwise AND
56#[cfg(feature = "native")]
57const NUM_INDEX_SHARDS: usize = 64;
58
59/// Interned term key combining field and term
60#[cfg(feature = "native")]
61#[derive(Clone, Copy, PartialEq, Eq, Hash)]
62struct TermKey {
63    field: u32,
64    term: Spur,
65}
66
67#[cfg(feature = "native")]
68impl TermKey {
69    /// Get shard index for this term key (uses term's internal id for distribution)
70    #[inline]
71    fn shard(&self) -> usize {
72        // Spur wraps NonZero<u32>, get the raw value for sharding
73        // Using bitwise AND since NUM_INDEX_SHARDS is power of 2
74        (self.term.into_inner().get() as usize) & (NUM_INDEX_SHARDS - 1)
75    }
76}
77
78/// Sharded inverted index for better cache locality
79/// Each shard is a smaller hashmap that fits better in CPU cache
80#[cfg(feature = "native")]
81struct ShardedInvertedIndex {
82    shards: Vec<HashMap<TermKey, PostingListMeta>>,
83}
84
85#[cfg(feature = "native")]
86impl ShardedInvertedIndex {
87    fn new(capacity_per_shard: usize) -> Self {
88        let mut shards = Vec::with_capacity(NUM_INDEX_SHARDS);
89        for _ in 0..NUM_INDEX_SHARDS {
90            shards.push(HashMap::with_capacity(capacity_per_shard));
91        }
92        Self { shards }
93    }
94
95    #[inline]
96    fn get_mut(&mut self, key: &TermKey) -> Option<&mut PostingListMeta> {
97        self.shards[key.shard()].get_mut(key)
98    }
99
100    /// Get or insert a posting list meta for the given term key
101    /// The arena_len is used to set the start position for new terms
102    #[inline]
103    fn get_or_insert(&mut self, key: TermKey, arena_len: u32) -> &mut PostingListMeta {
104        self.shards[key.shard()]
105            .entry(key)
106            .or_insert_with(|| PostingListMeta::new(arena_len))
107    }
108
109    fn len(&self) -> usize {
110        self.shards.iter().map(|s| s.len()).sum()
111    }
112
113    /// Get total number of in-memory postings across all shards
114    fn total_postings_in_memory(&self) -> usize {
115        self.shards
116            .iter()
117            .flat_map(|s| s.values())
118            .map(|p| p.len as usize)
119            .sum()
120    }
121
122    /// Get shard size distribution (min, max, avg)
123    fn shard_stats(&self) -> (usize, usize, usize) {
124        let sizes: Vec<usize> = self.shards.iter().map(|s| s.len()).collect();
125        let min = *sizes.iter().min().unwrap_or(&0);
126        let max = *sizes.iter().max().unwrap_or(&0);
127        let avg = if sizes.is_empty() {
128            0
129        } else {
130            sizes.iter().sum::<usize>() / sizes.len()
131        };
132        (min, max, avg)
133    }
134}
135
136/// Iterator over all entries in the sharded index
137#[cfg(feature = "native")]
138struct ShardedIndexIter<'a> {
139    shards: std::slice::Iter<'a, HashMap<TermKey, PostingListMeta>>,
140    current: Option<hashbrown::hash_map::Iter<'a, TermKey, PostingListMeta>>,
141}
142
143#[cfg(feature = "native")]
144impl<'a> Iterator for ShardedIndexIter<'a> {
145    type Item = (&'a TermKey, &'a PostingListMeta);
146
147    fn next(&mut self) -> Option<Self::Item> {
148        loop {
149            if let Some(ref mut current) = self.current
150                && let Some(item) = current.next()
151            {
152                return Some(item);
153            }
154            // Move to next shard
155            match self.shards.next() {
156                Some(shard) => self.current = Some(shard.iter()),
157                None => return None,
158            }
159        }
160    }
161}
162
163#[cfg(feature = "native")]
164impl<'a> IntoIterator for &'a ShardedInvertedIndex {
165    type Item = (&'a TermKey, &'a PostingListMeta);
166    type IntoIter = ShardedIndexIter<'a>;
167
168    fn into_iter(self) -> Self::IntoIter {
169        ShardedIndexIter {
170            shards: self.shards.iter(),
171            current: None,
172        }
173    }
174}
175
176/// Compact posting entry for in-memory storage (6 bytes)
177#[cfg(feature = "native")]
178#[derive(Clone, Copy)]
179#[repr(C, packed)]
180struct CompactPosting {
181    doc_id: DocId,
182    term_freq: u16,
183}
184
185/// Metadata for a term's posting list (stored in hashmap, points into arena)
186#[cfg(feature = "native")]
187#[derive(Clone, Copy)]
188struct PostingListMeta {
189    /// Start index in the posting arena
190    start: u32,
191    /// Number of postings in arena
192    len: u32,
193    /// Last doc_id for merging duplicate postings
194    last_doc_id: DocId,
195    /// Offset in spill file (-1 if none)
196    spill_offset: i64,
197    /// Number of postings in spill file
198    spill_count: u32,
199}
200
201#[cfg(feature = "native")]
202impl PostingListMeta {
203    fn new(start: u32) -> Self {
204        Self {
205            start,
206            len: 0,
207            last_doc_id: u32::MAX,
208            spill_offset: -1,
209            spill_count: 0,
210        }
211    }
212
213    fn total_count(&self) -> usize {
214        self.len as usize + self.spill_count as usize
215    }
216
217    fn needs_spill(&self) -> bool {
218        self.len as usize >= POSTING_FLUSH_THRESHOLD
219    }
220}
221
222/// Arena for storing all postings contiguously
223/// Eliminates millions of small Vec allocations
224#[cfg(feature = "native")]
225struct PostingArena {
226    /// Single contiguous buffer for all postings
227    data: Vec<CompactPosting>,
228}
229
230#[cfg(feature = "native")]
231impl PostingArena {
232    fn with_capacity(capacity: usize) -> Self {
233        Self {
234            data: Vec::with_capacity(capacity),
235        }
236    }
237
238    /// Add a posting for a term, returns true if added (false if merged with last)
239    #[inline]
240    fn add(&mut self, meta: &mut PostingListMeta, doc_id: DocId, term_freq: u32) {
241        // Check if we can merge with the last posting for this term
242        if meta.len > 0 && meta.last_doc_id == doc_id {
243            // Merge: update the last posting's term_freq
244            let idx = meta.start as usize + meta.len as usize - 1;
245            let posting = &mut self.data[idx];
246            posting.term_freq = posting.term_freq.saturating_add(term_freq as u16);
247            return;
248        }
249
250        // Add new posting
251        self.data.push(CompactPosting {
252            doc_id,
253            term_freq: term_freq.min(u16::MAX as u32) as u16,
254        });
255        meta.len += 1;
256        meta.last_doc_id = doc_id;
257    }
258
259    /// Get postings for a term
260    fn get_postings(&self, meta: &PostingListMeta) -> &[CompactPosting] {
261        let start = meta.start as usize;
262        let end = start + meta.len as usize;
263        &self.data[start..end]
264    }
265
266    /// Clear postings for a term (after spilling)
267    fn clear_postings(&mut self, meta: &mut PostingListMeta) {
268        // We can't actually remove from middle of arena, but we mark as empty
269        // The space is "wasted" but avoids reallocation
270        meta.len = 0;
271        meta.start = self.data.len() as u32; // Point to end for future adds
272    }
273
274    fn len(&self) -> usize {
275        self.data.len()
276    }
277}
278
279/// Statistics for debugging segment builder performance
280#[cfg(feature = "native")]
281#[derive(Debug, Clone)]
282pub struct SegmentBuilderStats {
283    /// Number of documents indexed
284    pub num_docs: u32,
285    /// Number of unique terms in the inverted index
286    pub unique_terms: usize,
287    /// Total postings in memory (across all terms)
288    pub postings_in_memory: usize,
289    /// Number of interned strings
290    pub interned_strings: usize,
291    /// Size of doc_field_lengths vector
292    pub doc_field_lengths_size: usize,
293    /// Shard distribution (min, max, avg terms per shard)
294    pub shard_min: usize,
295    pub shard_max: usize,
296    pub shard_avg: usize,
297    /// Spill file offset (bytes written to disk)
298    pub spill_bytes: u64,
299}
300
301/// Configuration for segment builder
302#[cfg(feature = "native")]
303#[derive(Clone)]
304pub struct SegmentBuilderConfig {
305    /// Directory for temporary spill files
306    pub temp_dir: PathBuf,
307    /// Compression level for document store
308    pub compression_level: CompressionLevel,
309    /// Number of threads for parallel compression
310    pub num_compression_threads: usize,
311    /// Initial capacity for term interner
312    pub interner_capacity: usize,
313    /// Initial capacity for posting lists hashmap
314    pub posting_map_capacity: usize,
315}
316
317#[cfg(feature = "native")]
318impl Default for SegmentBuilderConfig {
319    fn default() -> Self {
320        Self {
321            temp_dir: std::env::temp_dir(),
322            compression_level: CompressionLevel(7),
323            num_compression_threads: num_cpus::get(),
324            interner_capacity: 1_000_000,
325            posting_map_capacity: 500_000,
326        }
327    }
328}
329
330/// Segment builder with optimized memory usage
331///
332/// Features:
333/// - Streams documents to disk immediately (no in-memory document storage)
334/// - Uses string interning for terms (reduced allocations)
335/// - Uses hashbrown HashMap (faster than BTreeMap)
336/// - Spills large posting lists to disk (bounded memory)
337#[cfg(feature = "native")]
338pub struct SegmentBuilder {
339    schema: Schema,
340    config: SegmentBuilderConfig,
341    tokenizers: FxHashMap<Field, BoxedTokenizer>,
342
343    /// String interner for terms - O(1) lookup and deduplication
344    term_interner: Rodeo,
345
346    /// Sharded inverted index for better cache locality
347    /// Each shard is a smaller hashmap that fits better in CPU cache
348    inverted_index: ShardedInvertedIndex,
349
350    /// Arena for all postings - single contiguous allocation
351    posting_arena: PostingArena,
352
353    /// Streaming document store writer
354    store_file: BufWriter<File>,
355    store_path: PathBuf,
356
357    /// Spill file for large posting lists
358    spill_file: Option<BufWriter<File>>,
359    spill_path: PathBuf,
360    spill_offset: u64,
361
362    /// Document count
363    next_doc_id: DocId,
364
365    /// Per-field statistics for BM25F
366    field_stats: FxHashMap<u32, FieldStats>,
367
368    /// Per-document field lengths stored compactly
369    /// Uses a flat Vec instead of Vec<HashMap> for better cache locality
370    /// Layout: [doc0_field0_len, doc0_field1_len, ..., doc1_field0_len, ...]
371    doc_field_lengths: Vec<u32>,
372    num_indexed_fields: usize,
373    field_to_slot: FxHashMap<u32, usize>,
374
375    /// Optional pre-computed WAND data for IDF values
376    wand_data: Option<Arc<WandData>>,
377
378    /// Reusable buffer for per-document term frequency aggregation
379    /// Avoids allocating a new hashmap for each document
380    local_tf_buffer: FxHashMap<Spur, u32>,
381
382    /// Reusable buffer for terms that need spilling
383    terms_to_spill_buffer: Vec<TermKey>,
384
385    /// Reusable buffer for tokenization to avoid per-token String allocations
386    token_buffer: String,
387}
388
389#[cfg(feature = "native")]
390impl SegmentBuilder {
391    /// Create a new segment builder
392    pub fn new(schema: Schema, config: SegmentBuilderConfig) -> Result<Self> {
393        let segment_id = uuid::Uuid::new_v4();
394        let store_path = config
395            .temp_dir
396            .join(format!("hermes_store_{}.tmp", segment_id));
397        let spill_path = config
398            .temp_dir
399            .join(format!("hermes_spill_{}.tmp", segment_id));
400
401        let store_file = BufWriter::with_capacity(
402            SPILL_BUFFER_SIZE,
403            OpenOptions::new()
404                .create(true)
405                .write(true)
406                .truncate(true)
407                .open(&store_path)?,
408        );
409
410        // Count indexed fields for compact field length storage
411        let mut num_indexed_fields = 0;
412        let mut field_to_slot = FxHashMap::default();
413        for (field, entry) in schema.fields() {
414            if entry.indexed && matches!(entry.field_type, FieldType::Text) {
415                field_to_slot.insert(field.0, num_indexed_fields);
416                num_indexed_fields += 1;
417            }
418        }
419
420        Ok(Self {
421            schema,
422            tokenizers: FxHashMap::default(),
423            term_interner: Rodeo::new(),
424            inverted_index: ShardedInvertedIndex::new(
425                config.posting_map_capacity / NUM_INDEX_SHARDS,
426            ),
427            posting_arena: PostingArena::with_capacity(config.posting_map_capacity * 4),
428            store_file,
429            store_path,
430            spill_file: None,
431            spill_path,
432            spill_offset: 0,
433            next_doc_id: 0,
434            field_stats: FxHashMap::default(),
435            doc_field_lengths: Vec::new(),
436            num_indexed_fields,
437            field_to_slot,
438            wand_data: None,
439            local_tf_buffer: FxHashMap::default(),
440            terms_to_spill_buffer: Vec::new(),
441            token_buffer: String::with_capacity(64),
442            config,
443        })
444    }
445
446    /// Create with pre-computed WAND data
447    pub fn with_wand_data(
448        schema: Schema,
449        config: SegmentBuilderConfig,
450        wand_data: Arc<WandData>,
451    ) -> Result<Self> {
452        let mut builder = Self::new(schema, config)?;
453        builder.wand_data = Some(wand_data);
454        Ok(builder)
455    }
456
457    pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
458        self.tokenizers.insert(field, tokenizer);
459    }
460
461    pub fn num_docs(&self) -> u32 {
462        self.next_doc_id
463    }
464
465    /// Get current statistics for debugging performance
466    pub fn stats(&self) -> SegmentBuilderStats {
467        let (shard_min, shard_max, shard_avg) = self.inverted_index.shard_stats();
468        SegmentBuilderStats {
469            num_docs: self.next_doc_id,
470            unique_terms: self.inverted_index.len(),
471            postings_in_memory: self.inverted_index.total_postings_in_memory(),
472            interned_strings: self.term_interner.len(),
473            doc_field_lengths_size: self.doc_field_lengths.len(),
474            shard_min,
475            shard_max,
476            shard_avg,
477            spill_bytes: self.spill_offset,
478        }
479    }
480
481    /// Add a document - streams to disk immediately
482    pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
483        let doc_id = self.next_doc_id;
484        self.next_doc_id += 1;
485
486        // Initialize field lengths for this document
487        let base_idx = self.doc_field_lengths.len();
488        self.doc_field_lengths
489            .resize(base_idx + self.num_indexed_fields, 0);
490
491        for (field, value) in doc.field_values() {
492            let entry = self.schema.get_field_entry(*field);
493            if entry.is_none() || !entry.unwrap().indexed {
494                continue;
495            }
496
497            let entry = entry.unwrap();
498            match (&entry.field_type, value) {
499                (FieldType::Text, FieldValue::Text(text)) => {
500                    let token_count = self.index_text_field(*field, doc_id, text)?;
501
502                    // Update field statistics
503                    let stats = self.field_stats.entry(field.0).or_default();
504                    stats.total_tokens += token_count as u64;
505                    stats.doc_count += 1;
506
507                    // Store field length compactly
508                    if let Some(&slot) = self.field_to_slot.get(&field.0) {
509                        self.doc_field_lengths[base_idx + slot] = token_count;
510                    }
511                }
512                (FieldType::U64, FieldValue::U64(v)) => {
513                    self.index_numeric_field(*field, doc_id, *v)?;
514                }
515                (FieldType::I64, FieldValue::I64(v)) => {
516                    self.index_numeric_field(*field, doc_id, *v as u64)?;
517                }
518                (FieldType::F64, FieldValue::F64(v)) => {
519                    self.index_numeric_field(*field, doc_id, v.to_bits())?;
520                }
521                _ => {}
522            }
523        }
524
525        // Stream document to disk immediately
526        self.write_document_to_store(&doc)?;
527
528        Ok(doc_id)
529    }
530
531    /// Index a text field using interned terms
532    ///
533    /// Optimization: Zero-allocation inline tokenization + term frequency aggregation.
534    /// Instead of allocating a String per token, we:
535    /// 1. Iterate over whitespace-split words
536    /// 2. Build lowercase token in a reusable buffer
537    /// 3. Intern directly from the buffer
538    fn index_text_field(&mut self, field: Field, doc_id: DocId, text: &str) -> Result<u32> {
539        // Phase 1: Aggregate term frequencies within this document
540        // Reuse buffer to avoid allocations
541        self.local_tf_buffer.clear();
542
543        let mut token_count = 0u32;
544
545        // Zero-allocation tokenization: iterate words, lowercase inline, intern directly
546        for word in text.split_whitespace() {
547            // Build lowercase token in reusable buffer
548            self.token_buffer.clear();
549            for c in word.chars() {
550                if c.is_alphanumeric() {
551                    for lc in c.to_lowercase() {
552                        self.token_buffer.push(lc);
553                    }
554                }
555            }
556
557            if self.token_buffer.is_empty() {
558                continue;
559            }
560
561            token_count += 1;
562
563            // Intern the term directly from buffer - O(1) amortized
564            let term_spur = self.term_interner.get_or_intern(&self.token_buffer);
565            *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
566        }
567
568        // Phase 2: Insert aggregated terms into inverted index
569        // Now we only do one inverted_index lookup per unique term in doc
570        // Reuse buffer for terms to spill
571        let field_id = field.0;
572        self.terms_to_spill_buffer.clear();
573
574        for (&term_spur, &tf) in &self.local_tf_buffer {
575            let term_key = TermKey {
576                field: field_id,
577                term: term_spur,
578            };
579
580            let arena_len = self.posting_arena.len() as u32;
581            let meta = self.inverted_index.get_or_insert(term_key, arena_len);
582            self.posting_arena.add(meta, doc_id, tf);
583
584            // Mark for spilling if needed
585            if meta.needs_spill() {
586                self.terms_to_spill_buffer.push(term_key);
587            }
588        }
589
590        // Phase 3: Spill large posting lists
591        for i in 0..self.terms_to_spill_buffer.len() {
592            let term_key = self.terms_to_spill_buffer[i];
593            self.spill_posting_list(term_key)?;
594        }
595
596        Ok(token_count)
597    }
598
599    fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
600        // For numeric fields, we use a special encoding
601        let term_str = format!("__num_{}", value);
602        let term_spur = self.term_interner.get_or_intern(&term_str);
603
604        let term_key = TermKey {
605            field: field.0,
606            term: term_spur,
607        };
608
609        let arena_len = self.posting_arena.len() as u32;
610        let meta = self.inverted_index.get_or_insert(term_key, arena_len);
611        self.posting_arena.add(meta, doc_id, 1);
612
613        Ok(())
614    }
615
616    /// Write document to streaming store
617    fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
618        use byteorder::{LittleEndian, WriteBytesExt};
619
620        let doc_bytes = super::store::serialize_document(doc, &self.schema)?;
621
622        self.store_file
623            .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
624        self.store_file.write_all(&doc_bytes)?;
625
626        Ok(())
627    }
628
629    /// Spill a large posting list to disk
630    fn spill_posting_list(&mut self, term_key: TermKey) -> Result<()> {
631        use byteorder::{LittleEndian, WriteBytesExt};
632
633        // Initialize spill file if needed
634        if self.spill_file.is_none() {
635            let file = OpenOptions::new()
636                .create(true)
637                .write(true)
638                .read(true)
639                .truncate(true)
640                .open(&self.spill_path)?;
641            self.spill_file = Some(BufWriter::with_capacity(SPILL_BUFFER_SIZE, file));
642        }
643
644        let meta = self.inverted_index.get_mut(&term_key).unwrap();
645        let postings = self.posting_arena.get_postings(meta);
646
647        // Record spill offset if this is first spill for this term
648        if meta.spill_offset < 0 {
649            meta.spill_offset = self.spill_offset as i64;
650        }
651
652        let spill_file = self.spill_file.as_mut().unwrap();
653
654        // Write postings to spill file
655        for p in postings {
656            spill_file.write_u32::<LittleEndian>(p.doc_id)?;
657            spill_file.write_u16::<LittleEndian>(p.term_freq)?;
658            self.spill_offset += 6; // 4 bytes doc_id + 2 bytes term_freq
659        }
660
661        meta.spill_count += meta.len;
662
663        // Clear postings in arena (mark as empty, point to end for future adds)
664        self.posting_arena.clear_postings(meta);
665
666        Ok(())
667    }
668
669    /// Build the final segment
670    pub async fn build<D: Directory + DirectoryWriter>(
671        mut self,
672        dir: &D,
673        segment_id: SegmentId,
674    ) -> Result<SegmentMeta> {
675        // Flush any buffered data
676        self.store_file.flush()?;
677        if let Some(ref mut spill) = self.spill_file {
678            spill.flush()?;
679        }
680
681        let files = SegmentFiles::new(segment_id.0);
682
683        // Build term dictionary and postings
684        let (term_dict_data, postings_data) = self.build_postings()?;
685
686        // Build document store from streamed data
687        let store_data = self.build_store_from_stream()?;
688
689        // Write to directory
690        dir.write(&files.term_dict, &term_dict_data).await?;
691        dir.write(&files.postings, &postings_data).await?;
692        dir.write(&files.store, &store_data).await?;
693
694        let meta = SegmentMeta {
695            id: segment_id.0,
696            num_docs: self.next_doc_id,
697            field_stats: self.field_stats.clone(),
698        };
699
700        dir.write(&files.meta, &meta.serialize()?).await?;
701
702        // Cleanup temp files
703        let _ = std::fs::remove_file(&self.store_path);
704        let _ = std::fs::remove_file(&self.spill_path);
705
706        Ok(meta)
707    }
708
709    /// Build postings from inverted index
710    fn build_postings(&mut self) -> Result<(Vec<u8>, Vec<u8>)> {
711        use std::collections::BTreeMap;
712
713        // We need to sort terms for SSTable, so collect into BTreeMap
714        // Key format: field_id (4 bytes) + term bytes
715        let mut sorted_terms: BTreeMap<Vec<u8>, &PostingListMeta> = BTreeMap::new();
716
717        for (term_key, meta) in &self.inverted_index {
718            let term_str = self.term_interner.resolve(&term_key.term);
719            let mut key = Vec::with_capacity(4 + term_str.len());
720            key.extend_from_slice(&term_key.field.to_le_bytes());
721            key.extend_from_slice(term_str.as_bytes());
722            sorted_terms.insert(key, meta);
723        }
724
725        let mut term_dict = Vec::new();
726        let mut postings = Vec::new();
727        let mut writer = SSTableWriter::<TermInfo>::new(&mut term_dict);
728
729        // Memory-map spill file if it exists
730        let spill_mmap = if self.spill_file.is_some() {
731            drop(self.spill_file.take()); // Close writer
732            let file = File::open(&self.spill_path)?;
733            Some(unsafe { memmap2::Mmap::map(&file)? })
734        } else {
735            None
736        };
737
738        for (key, meta) in sorted_terms {
739            // Reconstruct full posting list
740            let mut full_postings = PostingList::with_capacity(meta.total_count());
741
742            // Read spilled postings first (they come before in-memory ones)
743            if meta.spill_offset >= 0
744                && let Some(ref mmap) = spill_mmap
745            {
746                let mut offset = meta.spill_offset as usize;
747                for _ in 0..meta.spill_count {
748                    let doc_id = u32::from_le_bytes([
749                        mmap[offset],
750                        mmap[offset + 1],
751                        mmap[offset + 2],
752                        mmap[offset + 3],
753                    ]);
754                    let term_freq = u16::from_le_bytes([mmap[offset + 4], mmap[offset + 5]]);
755                    full_postings.push(doc_id, term_freq as u32);
756                    offset += 6;
757                }
758            }
759
760            // Add in-memory postings from arena
761            let arena_postings = self.posting_arena.get_postings(meta);
762            for p in arena_postings {
763                full_postings.push(p.doc_id, p.term_freq as u32);
764            }
765
766            // Build term info
767            let doc_ids: Vec<u32> = full_postings.iter().map(|p| p.doc_id).collect();
768            let term_freqs: Vec<u32> = full_postings.iter().map(|p| p.term_freq).collect();
769
770            let term_info = if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
771                inline
772            } else {
773                let posting_offset = postings.len() as u64;
774                let block_list =
775                    crate::structures::BlockPostingList::from_posting_list(&full_postings)?;
776                block_list.serialize(&mut postings)?;
777                TermInfo::external(
778                    posting_offset,
779                    (postings.len() as u64 - posting_offset) as u32,
780                    full_postings.doc_count(),
781                )
782            };
783
784            writer.insert(&key, &term_info)?;
785        }
786
787        writer.finish()?;
788        Ok((term_dict, postings))
789    }
790
791    /// Build document store from streamed temp file
792    fn build_store_from_stream(&mut self) -> Result<Vec<u8>> {
793        // Memory-map the temp store file
794        drop(std::mem::replace(
795            &mut self.store_file,
796            BufWriter::new(File::create("/dev/null")?),
797        ));
798
799        let file = File::open(&self.store_path)?;
800        let mmap = unsafe { memmap2::Mmap::map(&file)? };
801
802        // Re-compress with proper block structure
803        let mut store_data = Vec::new();
804        let mut store_writer =
805            StoreWriter::with_compression_level(&mut store_data, self.config.compression_level);
806
807        let mut offset = 0usize;
808        while offset < mmap.len() {
809            if offset + 4 > mmap.len() {
810                break;
811            }
812
813            let doc_len = u32::from_le_bytes([
814                mmap[offset],
815                mmap[offset + 1],
816                mmap[offset + 2],
817                mmap[offset + 3],
818            ]) as usize;
819            offset += 4;
820
821            if offset + doc_len > mmap.len() {
822                break;
823            }
824
825            let doc_bytes = &mmap[offset..offset + doc_len];
826            offset += doc_len;
827
828            // Deserialize and re-store with proper compression
829            if let Ok(doc) = super::store::deserialize_document(doc_bytes, &self.schema) {
830                store_writer.store(&doc, &self.schema)?;
831            }
832        }
833
834        store_writer.finish()?;
835        Ok(store_data)
836    }
837}
838
839#[cfg(feature = "native")]
840impl Drop for SegmentBuilder {
841    fn drop(&mut self) {
842        // Cleanup temp files on drop
843        let _ = std::fs::remove_file(&self.store_path);
844        let _ = std::fs::remove_file(&self.spill_path);
845    }
846}