hermes_core/segment/
builder.rs

1//! Streaming segment builder with optimized memory usage
2//!
3//! Key optimizations:
4//! - **String interning**: Terms are interned using `lasso` to avoid repeated allocations
5//! - **hashbrown HashMap**: O(1) average insertion instead of BTreeMap's O(log n)
6//! - **Streaming document store**: Documents written to disk immediately
7//! - **Incremental posting flush**: Large posting lists flushed to temp file
8//! - **Memory-mapped intermediate files**: Reduces memory pressure
9//! - **Arena allocation**: Batch allocations for reduced fragmentation
10
11#[cfg(feature = "native")]
12use std::fs::{File, OpenOptions};
13#[cfg(feature = "native")]
14use std::io::{BufWriter, Write};
15#[cfg(feature = "native")]
16use std::path::PathBuf;
17#[cfg(feature = "native")]
18use std::sync::Arc;
19
20#[cfg(feature = "native")]
21use hashbrown::HashMap;
22#[cfg(feature = "native")]
23use lasso::{Key, Rodeo, Spur};
24#[cfg(feature = "native")]
25use rustc_hash::FxHashMap;
26
27#[cfg(feature = "native")]
28use super::store::StoreWriter;
29#[cfg(feature = "native")]
30use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
31#[cfg(feature = "native")]
32use crate::compression::CompressionLevel;
33#[cfg(feature = "native")]
34use crate::directories::{Directory, DirectoryWriter};
35#[cfg(feature = "native")]
36use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
37#[cfg(feature = "native")]
38use crate::structures::{PostingList, SSTableWriter, TermInfo};
39#[cfg(feature = "native")]
40use crate::tokenizer::{BoxedTokenizer, LowercaseTokenizer};
41#[cfg(feature = "native")]
42use crate::wand::WandData;
43#[cfg(feature = "native")]
44use crate::{DocId, Result};
45
46/// Threshold for flushing a posting list to disk (number of postings)
47#[cfg(feature = "native")]
48const POSTING_FLUSH_THRESHOLD: usize = 100_000;
49
50/// Size of the posting spill buffer before writing to disk
51#[cfg(feature = "native")]
52const SPILL_BUFFER_SIZE: usize = 16 * 1024 * 1024; // 16MB
53
54/// Number of shards for the inverted index
55/// Power of 2 for fast modulo via bitwise AND
56#[cfg(feature = "native")]
57const NUM_INDEX_SHARDS: usize = 64;
58
59/// Interned term key combining field and term
60#[cfg(feature = "native")]
61#[derive(Clone, Copy, PartialEq, Eq, Hash)]
62struct TermKey {
63    field: u32,
64    term: ShardedSpur,
65}
66
67#[cfg(feature = "native")]
68impl TermKey {
69    /// Get shard index for this term key (uses term's packed id for distribution)
70    #[inline]
71    fn shard(&self) -> usize {
72        // Use the packed value for sharding (combines shard + local_id)
73        // Using bitwise AND since NUM_INDEX_SHARDS is power of 2
74        (self.term.packed as usize) & (NUM_INDEX_SHARDS - 1)
75    }
76}
77
78/// Sharded inverted index for better cache locality
79/// Each shard is a smaller hashmap that fits better in CPU cache
80#[cfg(feature = "native")]
81struct ShardedInvertedIndex {
82    shards: Vec<HashMap<TermKey, SpillablePostingList>>,
83}
84
85#[cfg(feature = "native")]
86impl ShardedInvertedIndex {
87    fn new(capacity_per_shard: usize) -> Self {
88        let mut shards = Vec::with_capacity(NUM_INDEX_SHARDS);
89        for _ in 0..NUM_INDEX_SHARDS {
90            shards.push(HashMap::with_capacity(capacity_per_shard));
91        }
92        Self { shards }
93    }
94
95    #[inline]
96    fn get_mut(&mut self, key: &TermKey) -> Option<&mut SpillablePostingList> {
97        self.shards[key.shard()].get_mut(key)
98    }
99
100    /// Get or insert a posting list for the given term key
101    #[inline]
102    fn get_or_insert(&mut self, key: TermKey) -> &mut SpillablePostingList {
103        self.shards[key.shard()]
104            .entry(key)
105            .or_insert_with(SpillablePostingList::new)
106    }
107
108    fn len(&self) -> usize {
109        self.shards.iter().map(|s| s.len()).sum()
110    }
111
112    /// Get total number of in-memory postings across all shards
113    fn total_postings_in_memory(&self) -> usize {
114        self.shards
115            .iter()
116            .flat_map(|s| s.values())
117            .map(|p| p.memory.len())
118            .sum()
119    }
120
121    /// Get shard size distribution (min, max, avg)
122    fn shard_stats(&self) -> (usize, usize, usize) {
123        let sizes: Vec<usize> = self.shards.iter().map(|s| s.len()).collect();
124        let min = *sizes.iter().min().unwrap_or(&0);
125        let max = *sizes.iter().max().unwrap_or(&0);
126        let avg = if sizes.is_empty() {
127            0
128        } else {
129            sizes.iter().sum::<usize>() / sizes.len()
130        };
131        (min, max, avg)
132    }
133}
134
135/// Iterator over all entries in the sharded index
136#[cfg(feature = "native")]
137struct ShardedIndexIter<'a> {
138    shards: std::slice::Iter<'a, HashMap<TermKey, SpillablePostingList>>,
139    current: Option<hashbrown::hash_map::Iter<'a, TermKey, SpillablePostingList>>,
140}
141
142#[cfg(feature = "native")]
143impl<'a> Iterator for ShardedIndexIter<'a> {
144    type Item = (&'a TermKey, &'a SpillablePostingList);
145
146    fn next(&mut self) -> Option<Self::Item> {
147        loop {
148            if let Some(ref mut current) = self.current
149                && let Some(item) = current.next()
150            {
151                return Some(item);
152            }
153            // Move to next shard
154            match self.shards.next() {
155                Some(shard) => self.current = Some(shard.iter()),
156                None => return None,
157            }
158        }
159    }
160}
161
162#[cfg(feature = "native")]
163impl<'a> IntoIterator for &'a ShardedInvertedIndex {
164    type Item = (&'a TermKey, &'a SpillablePostingList);
165    type IntoIter = ShardedIndexIter<'a>;
166
167    fn into_iter(self) -> Self::IntoIter {
168        ShardedIndexIter {
169            shards: self.shards.iter(),
170            current: None,
171        }
172    }
173}
174
175/// Number of shards for the term interner
176/// Power of 2 for fast modulo via bitwise AND
177#[cfg(feature = "native")]
178const NUM_INTERNER_SHARDS: usize = 64;
179
180/// Sharded term ID combining shard index and local Spur
181/// Layout: [8 bits shard][24 bits local_id]
182#[cfg(feature = "native")]
183#[derive(Clone, Copy, PartialEq, Eq, Hash)]
184struct ShardedSpur {
185    /// Combined shard (high 8 bits) and local id (low 24 bits)
186    packed: u32,
187}
188
189#[cfg(feature = "native")]
190impl ShardedSpur {
191    #[inline]
192    fn new(shard: u8, local_spur: Spur) -> Self {
193        let local_id = local_spur.into_inner().get();
194        debug_assert!(local_id < (1 << 24), "Local spur ID overflow");
195        Self {
196            packed: ((shard as u32) << 24) | (local_id & 0x00FF_FFFF),
197        }
198    }
199
200    #[inline]
201    fn shard(self) -> usize {
202        (self.packed >> 24) as usize
203    }
204
205    #[inline]
206    fn local_id(self) -> u32 {
207        self.packed & 0x00FF_FFFF
208    }
209}
210
211/// Sharded string interner for better cache locality with large term counts
212/// Each shard is an independent Rodeo interner
213#[cfg(feature = "native")]
214struct ShardedTermInterner {
215    shards: Vec<Rodeo>,
216}
217
218#[cfg(feature = "native")]
219impl ShardedTermInterner {
220    fn new() -> Self {
221        let mut shards = Vec::with_capacity(NUM_INTERNER_SHARDS);
222        for _ in 0..NUM_INTERNER_SHARDS {
223            shards.push(Rodeo::new());
224        }
225        Self { shards }
226    }
227
228    /// Hash a string to determine its shard
229    #[inline]
230    fn shard_for_str(s: &str) -> usize {
231        // Use FxHash for fast hashing
232        use std::hash::{Hash, Hasher};
233        let mut hasher = rustc_hash::FxHasher::default();
234        s.hash(&mut hasher);
235        (hasher.finish() as usize) & (NUM_INTERNER_SHARDS - 1)
236    }
237
238    /// Get or intern a string, returning a sharded spur
239    #[inline]
240    fn get_or_intern(&mut self, s: &str) -> ShardedSpur {
241        let shard_idx = Self::shard_for_str(s);
242        let local_spur = self.shards[shard_idx].get_or_intern(s);
243        ShardedSpur::new(shard_idx as u8, local_spur)
244    }
245
246    /// Resolve a sharded spur back to its string
247    #[inline]
248    fn resolve(&self, spur: ShardedSpur) -> &str {
249        self.shards[spur.shard()].resolve(&Spur::try_from_usize(spur.local_id() as usize).unwrap())
250    }
251
252    /// Total number of interned strings across all shards
253    fn len(&self) -> usize {
254        self.shards.iter().map(|s| s.len()).sum()
255    }
256}
257
258/// Compact posting entry for in-memory storage
259#[cfg(feature = "native")]
260#[derive(Clone, Copy)]
261struct CompactPosting {
262    doc_id: DocId,
263    term_freq: u16, // Most term frequencies fit in u16
264}
265
266/// Posting list that can spill to disk when too large
267#[cfg(feature = "native")]
268struct SpillablePostingList {
269    /// In-memory postings (hot data)
270    memory: Vec<CompactPosting>,
271    /// Offset in spill file where flushed postings start (-1 if none)
272    spill_offset: i64,
273    /// Number of postings in spill file
274    spill_count: u32,
275}
276
277#[cfg(feature = "native")]
278impl SpillablePostingList {
279    fn new() -> Self {
280        Self {
281            memory: Vec::new(),
282            spill_offset: -1,
283            spill_count: 0,
284        }
285    }
286
287    #[allow(dead_code)]
288    fn with_capacity(capacity: usize) -> Self {
289        Self {
290            memory: Vec::with_capacity(capacity),
291            spill_offset: -1,
292            spill_count: 0,
293        }
294    }
295
296    #[inline]
297    fn add(&mut self, doc_id: DocId, term_freq: u32) {
298        // Merge with last posting if same doc_id
299        if let Some(last) = self.memory.last_mut()
300            && last.doc_id == doc_id
301        {
302            last.term_freq = last.term_freq.saturating_add(term_freq as u16);
303            return;
304        }
305        self.memory.push(CompactPosting {
306            doc_id,
307            term_freq: term_freq.min(u16::MAX as u32) as u16,
308        });
309    }
310
311    fn total_count(&self) -> usize {
312        self.memory.len() + self.spill_count as usize
313    }
314
315    fn needs_spill(&self) -> bool {
316        self.memory.len() >= POSTING_FLUSH_THRESHOLD
317    }
318}
319
320/// Statistics for debugging segment builder performance
321#[cfg(feature = "native")]
322#[derive(Debug, Clone)]
323pub struct SegmentBuilderStats {
324    /// Number of documents indexed
325    pub num_docs: u32,
326    /// Number of unique terms in the inverted index
327    pub unique_terms: usize,
328    /// Total postings in memory (across all terms)
329    pub postings_in_memory: usize,
330    /// Number of interned strings
331    pub interned_strings: usize,
332    /// Size of doc_field_lengths vector
333    pub doc_field_lengths_size: usize,
334    /// Shard distribution (min, max, avg terms per shard)
335    pub shard_min: usize,
336    pub shard_max: usize,
337    pub shard_avg: usize,
338    /// Spill file offset (bytes written to disk)
339    pub spill_bytes: u64,
340}
341
342/// Configuration for segment builder
343#[cfg(feature = "native")]
344#[derive(Clone)]
345pub struct SegmentBuilderConfig {
346    /// Directory for temporary spill files
347    pub temp_dir: PathBuf,
348    /// Compression level for document store
349    pub compression_level: CompressionLevel,
350    /// Number of threads for parallel compression
351    pub num_compression_threads: usize,
352    /// Initial capacity for term interner
353    pub interner_capacity: usize,
354    /// Initial capacity for posting lists hashmap
355    pub posting_map_capacity: usize,
356}
357
358#[cfg(feature = "native")]
359impl Default for SegmentBuilderConfig {
360    fn default() -> Self {
361        Self {
362            temp_dir: std::env::temp_dir(),
363            compression_level: CompressionLevel(7),
364            num_compression_threads: num_cpus::get(),
365            interner_capacity: 1_000_000,
366            posting_map_capacity: 500_000,
367        }
368    }
369}
370
371/// Segment builder with optimized memory usage
372///
373/// Features:
374/// - Streams documents to disk immediately (no in-memory document storage)
375/// - Uses string interning for terms (reduced allocations)
376/// - Uses hashbrown HashMap (faster than BTreeMap)
377/// - Spills large posting lists to disk (bounded memory)
378#[cfg(feature = "native")]
379pub struct SegmentBuilder {
380    schema: Schema,
381    config: SegmentBuilderConfig,
382    tokenizers: FxHashMap<Field, BoxedTokenizer>,
383
384    /// Sharded string interner for terms - O(1) lookup and deduplication
385    /// Sharded for better cache locality with large term counts
386    term_interner: ShardedTermInterner,
387
388    /// Sharded inverted index for better cache locality
389    /// Each shard is a smaller hashmap that fits better in CPU cache
390    inverted_index: ShardedInvertedIndex,
391
392    /// Streaming document store writer
393    store_file: BufWriter<File>,
394    store_path: PathBuf,
395
396    /// Spill file for large posting lists
397    spill_file: Option<BufWriter<File>>,
398    spill_path: PathBuf,
399    spill_offset: u64,
400
401    /// Document count
402    next_doc_id: DocId,
403
404    /// Per-field statistics for BM25F
405    field_stats: FxHashMap<u32, FieldStats>,
406
407    /// Per-document field lengths stored compactly
408    /// Uses a flat Vec instead of Vec<HashMap> for better cache locality
409    /// Layout: [doc0_field0_len, doc0_field1_len, ..., doc1_field0_len, ...]
410    doc_field_lengths: Vec<u32>,
411    num_indexed_fields: usize,
412    field_to_slot: FxHashMap<u32, usize>,
413
414    /// Optional pre-computed WAND data for IDF values
415    wand_data: Option<Arc<WandData>>,
416
417    /// Reusable buffer for per-document term frequency aggregation
418    /// Avoids allocating a new hashmap for each document
419    local_tf_buffer: FxHashMap<ShardedSpur, u32>,
420
421    /// Reusable buffer for terms that need spilling
422    terms_to_spill_buffer: Vec<TermKey>,
423}
424
425#[cfg(feature = "native")]
426impl SegmentBuilder {
427    /// Create a new segment builder
428    pub fn new(schema: Schema, config: SegmentBuilderConfig) -> Result<Self> {
429        let segment_id = uuid::Uuid::new_v4();
430        let store_path = config
431            .temp_dir
432            .join(format!("hermes_store_{}.tmp", segment_id));
433        let spill_path = config
434            .temp_dir
435            .join(format!("hermes_spill_{}.tmp", segment_id));
436
437        let store_file = BufWriter::with_capacity(
438            SPILL_BUFFER_SIZE,
439            OpenOptions::new()
440                .create(true)
441                .write(true)
442                .truncate(true)
443                .open(&store_path)?,
444        );
445
446        // Count indexed fields for compact field length storage
447        let mut num_indexed_fields = 0;
448        let mut field_to_slot = FxHashMap::default();
449        for (field, entry) in schema.fields() {
450            if entry.indexed && matches!(entry.field_type, FieldType::Text) {
451                field_to_slot.insert(field.0, num_indexed_fields);
452                num_indexed_fields += 1;
453            }
454        }
455
456        Ok(Self {
457            schema,
458            tokenizers: FxHashMap::default(),
459            term_interner: ShardedTermInterner::new(),
460            inverted_index: ShardedInvertedIndex::new(
461                config.posting_map_capacity / NUM_INDEX_SHARDS,
462            ),
463            store_file,
464            store_path,
465            spill_file: None,
466            spill_path,
467            spill_offset: 0,
468            next_doc_id: 0,
469            field_stats: FxHashMap::default(),
470            doc_field_lengths: Vec::new(),
471            num_indexed_fields,
472            field_to_slot,
473            wand_data: None,
474            local_tf_buffer: FxHashMap::default(),
475            terms_to_spill_buffer: Vec::new(),
476            config,
477        })
478    }
479
480    /// Create with pre-computed WAND data
481    pub fn with_wand_data(
482        schema: Schema,
483        config: SegmentBuilderConfig,
484        wand_data: Arc<WandData>,
485    ) -> Result<Self> {
486        let mut builder = Self::new(schema, config)?;
487        builder.wand_data = Some(wand_data);
488        Ok(builder)
489    }
490
491    pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
492        self.tokenizers.insert(field, tokenizer);
493    }
494
495    pub fn num_docs(&self) -> u32 {
496        self.next_doc_id
497    }
498
499    /// Get current statistics for debugging performance
500    pub fn stats(&self) -> SegmentBuilderStats {
501        let (shard_min, shard_max, shard_avg) = self.inverted_index.shard_stats();
502        SegmentBuilderStats {
503            num_docs: self.next_doc_id,
504            unique_terms: self.inverted_index.len(),
505            postings_in_memory: self.inverted_index.total_postings_in_memory(),
506            interned_strings: self.term_interner.len(),
507            doc_field_lengths_size: self.doc_field_lengths.len(),
508            shard_min,
509            shard_max,
510            shard_avg,
511            spill_bytes: self.spill_offset,
512        }
513    }
514
515    /// Add a document - streams to disk immediately
516    pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
517        let doc_id = self.next_doc_id;
518        self.next_doc_id += 1;
519
520        // Initialize field lengths for this document
521        let base_idx = self.doc_field_lengths.len();
522        self.doc_field_lengths
523            .resize(base_idx + self.num_indexed_fields, 0);
524
525        for (field, value) in doc.field_values() {
526            let entry = self.schema.get_field_entry(*field);
527            if entry.is_none() || !entry.unwrap().indexed {
528                continue;
529            }
530
531            let entry = entry.unwrap();
532            match (&entry.field_type, value) {
533                (FieldType::Text, FieldValue::Text(text)) => {
534                    let token_count = self.index_text_field(*field, doc_id, text)?;
535
536                    // Update field statistics
537                    let stats = self.field_stats.entry(field.0).or_default();
538                    stats.total_tokens += token_count as u64;
539                    stats.doc_count += 1;
540
541                    // Store field length compactly
542                    if let Some(&slot) = self.field_to_slot.get(&field.0) {
543                        self.doc_field_lengths[base_idx + slot] = token_count;
544                    }
545                }
546                (FieldType::U64, FieldValue::U64(v)) => {
547                    self.index_numeric_field(*field, doc_id, *v)?;
548                }
549                (FieldType::I64, FieldValue::I64(v)) => {
550                    self.index_numeric_field(*field, doc_id, *v as u64)?;
551                }
552                (FieldType::F64, FieldValue::F64(v)) => {
553                    self.index_numeric_field(*field, doc_id, v.to_bits())?;
554                }
555                _ => {}
556            }
557        }
558
559        // Stream document to disk immediately
560        self.write_document_to_store(&doc)?;
561
562        Ok(doc_id)
563    }
564
565    /// Index a text field using interned terms
566    ///
567    /// Optimization: aggregate term frequencies within the document first,
568    /// then do a single insert per unique term. This reduces hash lookups
569    /// from O(total_tokens) to O(unique_terms_in_doc).
570    fn index_text_field(&mut self, field: Field, doc_id: DocId, text: &str) -> Result<u32> {
571        let default_tokenizer = LowercaseTokenizer;
572        let tokenizer: &dyn crate::tokenizer::TokenizerClone = self
573            .tokenizers
574            .get(&field)
575            .map(|t| t.as_ref())
576            .unwrap_or(&default_tokenizer);
577
578        let tokens = tokenizer.tokenize(text);
579        let token_count = tokens.len() as u32;
580
581        // Phase 1: Aggregate term frequencies within this document
582        // Reuse buffer to avoid allocations
583        self.local_tf_buffer.clear();
584
585        for token in tokens {
586            // Intern the term - O(1) amortized
587            let term_spur = self.term_interner.get_or_intern(&token.text);
588            *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
589        }
590
591        // Phase 2: Insert aggregated terms into inverted index
592        // Now we only do one inverted_index lookup per unique term in doc
593        // Reuse buffer for terms to spill
594        let field_id = field.0;
595        self.terms_to_spill_buffer.clear();
596
597        for (&term_spur, &tf) in &self.local_tf_buffer {
598            let term_key = TermKey {
599                field: field_id,
600                term: term_spur,
601            };
602
603            let posting = self.inverted_index.get_or_insert(term_key);
604            posting.add(doc_id, tf);
605
606            // Mark for spilling if needed
607            if posting.needs_spill() {
608                self.terms_to_spill_buffer.push(term_key);
609            }
610        }
611
612        // Phase 3: Spill large posting lists
613        for i in 0..self.terms_to_spill_buffer.len() {
614            let term_key = self.terms_to_spill_buffer[i];
615            self.spill_posting_list(term_key)?;
616        }
617
618        Ok(token_count)
619    }
620
621    fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
622        // For numeric fields, we use a special encoding
623        let term_str = format!("__num_{}", value);
624        let term_spur = self.term_interner.get_or_intern(&term_str);
625
626        let term_key = TermKey {
627            field: field.0,
628            term: term_spur,
629        };
630
631        let posting = self.inverted_index.get_or_insert(term_key);
632        posting.add(doc_id, 1);
633
634        Ok(())
635    }
636
637    /// Write document to streaming store
638    fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
639        use byteorder::{LittleEndian, WriteBytesExt};
640
641        let doc_bytes = super::store::serialize_document(doc, &self.schema)?;
642
643        self.store_file
644            .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
645        self.store_file.write_all(&doc_bytes)?;
646
647        Ok(())
648    }
649
650    /// Spill a large posting list to disk
651    fn spill_posting_list(&mut self, term_key: TermKey) -> Result<()> {
652        use byteorder::{LittleEndian, WriteBytesExt};
653
654        let posting = self.inverted_index.get_mut(&term_key).unwrap();
655
656        // Initialize spill file if needed
657        if self.spill_file.is_none() {
658            let file = OpenOptions::new()
659                .create(true)
660                .write(true)
661                .read(true)
662                .truncate(true)
663                .open(&self.spill_path)?;
664            self.spill_file = Some(BufWriter::with_capacity(SPILL_BUFFER_SIZE, file));
665        }
666
667        let spill_file = self.spill_file.as_mut().unwrap();
668
669        // Record spill offset if this is first spill for this term
670        if posting.spill_offset < 0 {
671            posting.spill_offset = self.spill_offset as i64;
672        }
673
674        // Write postings to spill file
675        for p in &posting.memory {
676            spill_file.write_u32::<LittleEndian>(p.doc_id)?;
677            spill_file.write_u16::<LittleEndian>(p.term_freq)?;
678            self.spill_offset += 6; // 4 bytes doc_id + 2 bytes term_freq
679        }
680
681        posting.spill_count += posting.memory.len() as u32;
682        posting.memory.clear();
683        posting.memory.shrink_to(POSTING_FLUSH_THRESHOLD / 4); // Keep some capacity
684
685        Ok(())
686    }
687
688    /// Build the final segment
689    pub async fn build<D: Directory + DirectoryWriter>(
690        mut self,
691        dir: &D,
692        segment_id: SegmentId,
693    ) -> Result<SegmentMeta> {
694        // Flush any buffered data
695        self.store_file.flush()?;
696        if let Some(ref mut spill) = self.spill_file {
697            spill.flush()?;
698        }
699
700        let files = SegmentFiles::new(segment_id.0);
701
702        // Build term dictionary and postings
703        let (term_dict_data, postings_data) = self.build_postings()?;
704
705        // Build document store from streamed data
706        let store_data = self.build_store_from_stream()?;
707
708        // Write to directory
709        dir.write(&files.term_dict, &term_dict_data).await?;
710        dir.write(&files.postings, &postings_data).await?;
711        dir.write(&files.store, &store_data).await?;
712
713        let meta = SegmentMeta {
714            id: segment_id.0,
715            num_docs: self.next_doc_id,
716            field_stats: self.field_stats.clone(),
717        };
718
719        dir.write(&files.meta, &meta.serialize()?).await?;
720
721        // Cleanup temp files
722        let _ = std::fs::remove_file(&self.store_path);
723        let _ = std::fs::remove_file(&self.spill_path);
724
725        Ok(meta)
726    }
727
728    /// Build postings from inverted index
729    fn build_postings(&mut self) -> Result<(Vec<u8>, Vec<u8>)> {
730        use std::collections::BTreeMap;
731
732        // We need to sort terms for SSTable, so collect into BTreeMap
733        // Key format: field_id (4 bytes) + term bytes
734        let mut sorted_terms: BTreeMap<Vec<u8>, &SpillablePostingList> = BTreeMap::new();
735
736        for (term_key, posting_list) in &self.inverted_index {
737            let term_str = self.term_interner.resolve(term_key.term);
738            let mut key = Vec::with_capacity(4 + term_str.len());
739            key.extend_from_slice(&term_key.field.to_le_bytes());
740            key.extend_from_slice(term_str.as_bytes());
741            sorted_terms.insert(key, posting_list);
742        }
743
744        let mut term_dict = Vec::new();
745        let mut postings = Vec::new();
746        let mut writer = SSTableWriter::<TermInfo>::new(&mut term_dict);
747
748        // Memory-map spill file if it exists
749        let spill_mmap = if self.spill_file.is_some() {
750            drop(self.spill_file.take()); // Close writer
751            let file = File::open(&self.spill_path)?;
752            Some(unsafe { memmap2::Mmap::map(&file)? })
753        } else {
754            None
755        };
756
757        for (key, spill_posting) in sorted_terms {
758            // Reconstruct full posting list
759            let mut full_postings = PostingList::with_capacity(spill_posting.total_count());
760
761            // Read spilled postings first (they come before in-memory ones)
762            if spill_posting.spill_offset >= 0
763                && let Some(ref mmap) = spill_mmap
764            {
765                let mut offset = spill_posting.spill_offset as usize;
766                for _ in 0..spill_posting.spill_count {
767                    let doc_id = u32::from_le_bytes([
768                        mmap[offset],
769                        mmap[offset + 1],
770                        mmap[offset + 2],
771                        mmap[offset + 3],
772                    ]);
773                    let term_freq = u16::from_le_bytes([mmap[offset + 4], mmap[offset + 5]]);
774                    full_postings.push(doc_id, term_freq as u32);
775                    offset += 6;
776                }
777            }
778
779            // Add in-memory postings
780            for p in &spill_posting.memory {
781                full_postings.push(p.doc_id, p.term_freq as u32);
782            }
783
784            // Build term info
785            let doc_ids: Vec<u32> = full_postings.iter().map(|p| p.doc_id).collect();
786            let term_freqs: Vec<u32> = full_postings.iter().map(|p| p.term_freq).collect();
787
788            let term_info = if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
789                inline
790            } else {
791                let posting_offset = postings.len() as u64;
792                let block_list =
793                    crate::structures::BlockPostingList::from_posting_list(&full_postings)?;
794                block_list.serialize(&mut postings)?;
795                TermInfo::external(
796                    posting_offset,
797                    (postings.len() as u64 - posting_offset) as u32,
798                    full_postings.doc_count(),
799                )
800            };
801
802            writer.insert(&key, &term_info)?;
803        }
804
805        writer.finish()?;
806        Ok((term_dict, postings))
807    }
808
809    /// Build document store from streamed temp file
810    fn build_store_from_stream(&mut self) -> Result<Vec<u8>> {
811        // Memory-map the temp store file
812        drop(std::mem::replace(
813            &mut self.store_file,
814            BufWriter::new(File::create("/dev/null")?),
815        ));
816
817        let file = File::open(&self.store_path)?;
818        let mmap = unsafe { memmap2::Mmap::map(&file)? };
819
820        // Re-compress with proper block structure
821        let mut store_data = Vec::new();
822        let mut store_writer =
823            StoreWriter::with_compression_level(&mut store_data, self.config.compression_level);
824
825        let mut offset = 0usize;
826        while offset < mmap.len() {
827            if offset + 4 > mmap.len() {
828                break;
829            }
830
831            let doc_len = u32::from_le_bytes([
832                mmap[offset],
833                mmap[offset + 1],
834                mmap[offset + 2],
835                mmap[offset + 3],
836            ]) as usize;
837            offset += 4;
838
839            if offset + doc_len > mmap.len() {
840                break;
841            }
842
843            let doc_bytes = &mmap[offset..offset + doc_len];
844            offset += doc_len;
845
846            // Deserialize and re-store with proper compression
847            if let Ok(doc) = super::store::deserialize_document(doc_bytes, &self.schema) {
848                store_writer.store(&doc, &self.schema)?;
849            }
850        }
851
852        store_writer.finish()?;
853        Ok(store_data)
854    }
855}
856
857#[cfg(feature = "native")]
858impl Drop for SegmentBuilder {
859    fn drop(&mut self) {
860        // Cleanup temp files on drop
861        let _ = std::fs::remove_file(&self.store_path);
862        let _ = std::fs::remove_file(&self.spill_path);
863    }
864}