hermes_core/segment/
builder.rs

1//! Streaming segment builder with optimized memory usage
2//!
3//! Key optimizations:
4//! - **String interning**: Terms are interned using `lasso` to avoid repeated allocations
5//! - **hashbrown HashMap**: O(1) average insertion instead of BTreeMap's O(log n)
6//! - **Streaming document store**: Documents written to disk immediately
7//! - **Incremental posting flush**: Large posting lists flushed to temp file
8//! - **Memory-mapped intermediate files**: Reduces memory pressure
9//! - **Arena allocation**: Batch allocations for reduced fragmentation
10
11#[cfg(feature = "native")]
12use std::fs::{File, OpenOptions};
13#[cfg(feature = "native")]
14use std::io::{BufWriter, Write};
15#[cfg(feature = "native")]
16use std::path::PathBuf;
17#[cfg(feature = "native")]
18use std::sync::Arc;
19
20#[cfg(feature = "native")]
21use hashbrown::HashMap;
22#[cfg(feature = "native")]
23use lasso::{Rodeo, Spur};
24#[cfg(feature = "native")]
25use rustc_hash::FxHashMap;
26
27#[cfg(feature = "native")]
28use super::store::StoreWriter;
29#[cfg(feature = "native")]
30use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
31#[cfg(feature = "native")]
32use crate::compression::CompressionLevel;
33#[cfg(feature = "native")]
34use crate::directories::{Directory, DirectoryWriter};
35#[cfg(feature = "native")]
36use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
37#[cfg(feature = "native")]
38use crate::structures::{PostingList, SSTableWriter, TermInfo};
39#[cfg(feature = "native")]
40use crate::tokenizer::{BoxedTokenizer, LowercaseTokenizer};
41#[cfg(feature = "native")]
42use crate::wand::WandData;
43#[cfg(feature = "native")]
44use crate::{DocId, Result};
45
46/// Threshold for flushing a posting list to disk (number of postings)
47#[cfg(feature = "native")]
48const POSTING_FLUSH_THRESHOLD: usize = 100_000;
49
50/// Size of the posting spill buffer before writing to disk
51#[cfg(feature = "native")]
52const SPILL_BUFFER_SIZE: usize = 16 * 1024 * 1024; // 16MB
53
54/// Number of shards for the inverted index
55/// Power of 2 for fast modulo via bitwise AND
56#[cfg(feature = "native")]
57const NUM_INDEX_SHARDS: usize = 64;
58
59/// Interned term key combining field and term
60#[cfg(feature = "native")]
61#[derive(Clone, Copy, PartialEq, Eq, Hash)]
62struct TermKey {
63    field: u32,
64    term: Spur,
65}
66
67#[cfg(feature = "native")]
68impl TermKey {
69    /// Get shard index for this term key (uses term's internal id for distribution)
70    #[inline]
71    fn shard(&self) -> usize {
72        // Spur wraps NonZero<u32>, get the raw value for sharding
73        // Using bitwise AND since NUM_INDEX_SHARDS is power of 2
74        (self.term.into_inner().get() as usize) & (NUM_INDEX_SHARDS - 1)
75    }
76}
77
78/// Sharded inverted index for better cache locality
79/// Each shard is a smaller hashmap that fits better in CPU cache
80#[cfg(feature = "native")]
81struct ShardedInvertedIndex {
82    shards: Vec<HashMap<TermKey, SpillablePostingList>>,
83}
84
85#[cfg(feature = "native")]
86impl ShardedInvertedIndex {
87    fn new(capacity_per_shard: usize) -> Self {
88        let mut shards = Vec::with_capacity(NUM_INDEX_SHARDS);
89        for _ in 0..NUM_INDEX_SHARDS {
90            shards.push(HashMap::with_capacity(capacity_per_shard));
91        }
92        Self { shards }
93    }
94
95    #[inline]
96    fn get_mut(&mut self, key: &TermKey) -> Option<&mut SpillablePostingList> {
97        self.shards[key.shard()].get_mut(key)
98    }
99
100    /// Get or insert a posting list for the given term key
101    #[inline]
102    fn get_or_insert(&mut self, key: TermKey) -> &mut SpillablePostingList {
103        self.shards[key.shard()]
104            .entry(key)
105            .or_insert_with(SpillablePostingList::new)
106    }
107
108    #[allow(dead_code)]
109    fn len(&self) -> usize {
110        self.shards.iter().map(|s| s.len()).sum()
111    }
112}
113
114/// Iterator over all entries in the sharded index
115#[cfg(feature = "native")]
116struct ShardedIndexIter<'a> {
117    shards: std::slice::Iter<'a, HashMap<TermKey, SpillablePostingList>>,
118    current: Option<hashbrown::hash_map::Iter<'a, TermKey, SpillablePostingList>>,
119}
120
121#[cfg(feature = "native")]
122impl<'a> Iterator for ShardedIndexIter<'a> {
123    type Item = (&'a TermKey, &'a SpillablePostingList);
124
125    fn next(&mut self) -> Option<Self::Item> {
126        loop {
127            if let Some(ref mut current) = self.current
128                && let Some(item) = current.next()
129            {
130                return Some(item);
131            }
132            // Move to next shard
133            match self.shards.next() {
134                Some(shard) => self.current = Some(shard.iter()),
135                None => return None,
136            }
137        }
138    }
139}
140
141#[cfg(feature = "native")]
142impl<'a> IntoIterator for &'a ShardedInvertedIndex {
143    type Item = (&'a TermKey, &'a SpillablePostingList);
144    type IntoIter = ShardedIndexIter<'a>;
145
146    fn into_iter(self) -> Self::IntoIter {
147        ShardedIndexIter {
148            shards: self.shards.iter(),
149            current: None,
150        }
151    }
152}
153
154/// Compact posting entry for in-memory storage
155#[cfg(feature = "native")]
156#[derive(Clone, Copy)]
157struct CompactPosting {
158    doc_id: DocId,
159    term_freq: u16, // Most term frequencies fit in u16
160}
161
162/// Posting list that can spill to disk when too large
163#[cfg(feature = "native")]
164struct SpillablePostingList {
165    /// In-memory postings (hot data)
166    memory: Vec<CompactPosting>,
167    /// Offset in spill file where flushed postings start (-1 if none)
168    spill_offset: i64,
169    /// Number of postings in spill file
170    spill_count: u32,
171}
172
173#[cfg(feature = "native")]
174impl SpillablePostingList {
175    fn new() -> Self {
176        Self {
177            memory: Vec::new(),
178            spill_offset: -1,
179            spill_count: 0,
180        }
181    }
182
183    #[allow(dead_code)]
184    fn with_capacity(capacity: usize) -> Self {
185        Self {
186            memory: Vec::with_capacity(capacity),
187            spill_offset: -1,
188            spill_count: 0,
189        }
190    }
191
192    #[inline]
193    fn add(&mut self, doc_id: DocId, term_freq: u32) {
194        // Merge with last posting if same doc_id
195        if let Some(last) = self.memory.last_mut()
196            && last.doc_id == doc_id
197        {
198            last.term_freq = last.term_freq.saturating_add(term_freq as u16);
199            return;
200        }
201        self.memory.push(CompactPosting {
202            doc_id,
203            term_freq: term_freq.min(u16::MAX as u32) as u16,
204        });
205    }
206
207    fn total_count(&self) -> usize {
208        self.memory.len() + self.spill_count as usize
209    }
210
211    fn needs_spill(&self) -> bool {
212        self.memory.len() >= POSTING_FLUSH_THRESHOLD
213    }
214}
215
216/// Configuration for segment builder
217#[cfg(feature = "native")]
218#[derive(Clone)]
219pub struct SegmentBuilderConfig {
220    /// Directory for temporary spill files
221    pub temp_dir: PathBuf,
222    /// Compression level for document store
223    pub compression_level: CompressionLevel,
224    /// Number of threads for parallel compression
225    pub num_compression_threads: usize,
226    /// Initial capacity for term interner
227    pub interner_capacity: usize,
228    /// Initial capacity for posting lists hashmap
229    pub posting_map_capacity: usize,
230}
231
232#[cfg(feature = "native")]
233impl Default for SegmentBuilderConfig {
234    fn default() -> Self {
235        Self {
236            temp_dir: std::env::temp_dir(),
237            compression_level: CompressionLevel(7),
238            num_compression_threads: num_cpus::get(),
239            interner_capacity: 1_000_000,
240            posting_map_capacity: 500_000,
241        }
242    }
243}
244
245/// Segment builder with optimized memory usage
246///
247/// Features:
248/// - Streams documents to disk immediately (no in-memory document storage)
249/// - Uses string interning for terms (reduced allocations)
250/// - Uses hashbrown HashMap (faster than BTreeMap)
251/// - Spills large posting lists to disk (bounded memory)
252#[cfg(feature = "native")]
253pub struct SegmentBuilder {
254    schema: Schema,
255    config: SegmentBuilderConfig,
256    tokenizers: FxHashMap<Field, BoxedTokenizer>,
257
258    /// String interner for terms - O(1) lookup and deduplication
259    term_interner: Rodeo,
260
261    /// Sharded inverted index for better cache locality
262    /// Each shard is a smaller hashmap that fits better in CPU cache
263    inverted_index: ShardedInvertedIndex,
264
265    /// Streaming document store writer
266    store_file: BufWriter<File>,
267    store_path: PathBuf,
268
269    /// Spill file for large posting lists
270    spill_file: Option<BufWriter<File>>,
271    spill_path: PathBuf,
272    spill_offset: u64,
273
274    /// Document count
275    next_doc_id: DocId,
276
277    /// Per-field statistics for BM25F
278    field_stats: FxHashMap<u32, FieldStats>,
279
280    /// Per-document field lengths stored compactly
281    /// Uses a flat Vec instead of Vec<HashMap> for better cache locality
282    /// Layout: [doc0_field0_len, doc0_field1_len, ..., doc1_field0_len, ...]
283    doc_field_lengths: Vec<u32>,
284    num_indexed_fields: usize,
285    field_to_slot: FxHashMap<u32, usize>,
286
287    /// Optional pre-computed WAND data for IDF values
288    wand_data: Option<Arc<WandData>>,
289
290    /// Reusable buffer for per-document term frequency aggregation
291    /// Avoids allocating a new hashmap for each document
292    local_tf_buffer: FxHashMap<Spur, u32>,
293
294    /// Reusable buffer for terms that need spilling
295    terms_to_spill_buffer: Vec<TermKey>,
296}
297
298#[cfg(feature = "native")]
299impl SegmentBuilder {
300    /// Create a new segment builder
301    pub fn new(schema: Schema, config: SegmentBuilderConfig) -> Result<Self> {
302        let segment_id = uuid::Uuid::new_v4();
303        let store_path = config
304            .temp_dir
305            .join(format!("hermes_store_{}.tmp", segment_id));
306        let spill_path = config
307            .temp_dir
308            .join(format!("hermes_spill_{}.tmp", segment_id));
309
310        let store_file = BufWriter::with_capacity(
311            SPILL_BUFFER_SIZE,
312            OpenOptions::new()
313                .create(true)
314                .write(true)
315                .truncate(true)
316                .open(&store_path)?,
317        );
318
319        // Count indexed fields for compact field length storage
320        let mut num_indexed_fields = 0;
321        let mut field_to_slot = FxHashMap::default();
322        for (field, entry) in schema.fields() {
323            if entry.indexed && matches!(entry.field_type, FieldType::Text) {
324                field_to_slot.insert(field.0, num_indexed_fields);
325                num_indexed_fields += 1;
326            }
327        }
328
329        Ok(Self {
330            schema,
331            tokenizers: FxHashMap::default(),
332            term_interner: Rodeo::new(),
333            inverted_index: ShardedInvertedIndex::new(
334                config.posting_map_capacity / NUM_INDEX_SHARDS,
335            ),
336            store_file,
337            store_path,
338            spill_file: None,
339            spill_path,
340            spill_offset: 0,
341            next_doc_id: 0,
342            field_stats: FxHashMap::default(),
343            doc_field_lengths: Vec::new(),
344            num_indexed_fields,
345            field_to_slot,
346            wand_data: None,
347            local_tf_buffer: FxHashMap::default(),
348            terms_to_spill_buffer: Vec::new(),
349            config,
350        })
351    }
352
353    /// Create with pre-computed WAND data
354    pub fn with_wand_data(
355        schema: Schema,
356        config: SegmentBuilderConfig,
357        wand_data: Arc<WandData>,
358    ) -> Result<Self> {
359        let mut builder = Self::new(schema, config)?;
360        builder.wand_data = Some(wand_data);
361        Ok(builder)
362    }
363
364    pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
365        self.tokenizers.insert(field, tokenizer);
366    }
367
368    pub fn num_docs(&self) -> u32 {
369        self.next_doc_id
370    }
371
372    /// Add a document - streams to disk immediately
373    pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
374        let doc_id = self.next_doc_id;
375        self.next_doc_id += 1;
376
377        // Initialize field lengths for this document
378        let base_idx = self.doc_field_lengths.len();
379        self.doc_field_lengths
380            .resize(base_idx + self.num_indexed_fields, 0);
381
382        for (field, value) in doc.field_values() {
383            let entry = self.schema.get_field_entry(*field);
384            if entry.is_none() || !entry.unwrap().indexed {
385                continue;
386            }
387
388            let entry = entry.unwrap();
389            match (&entry.field_type, value) {
390                (FieldType::Text, FieldValue::Text(text)) => {
391                    let token_count = self.index_text_field(*field, doc_id, text)?;
392
393                    // Update field statistics
394                    let stats = self.field_stats.entry(field.0).or_default();
395                    stats.total_tokens += token_count as u64;
396                    stats.doc_count += 1;
397
398                    // Store field length compactly
399                    if let Some(&slot) = self.field_to_slot.get(&field.0) {
400                        self.doc_field_lengths[base_idx + slot] = token_count;
401                    }
402                }
403                (FieldType::U64, FieldValue::U64(v)) => {
404                    self.index_numeric_field(*field, doc_id, *v)?;
405                }
406                (FieldType::I64, FieldValue::I64(v)) => {
407                    self.index_numeric_field(*field, doc_id, *v as u64)?;
408                }
409                (FieldType::F64, FieldValue::F64(v)) => {
410                    self.index_numeric_field(*field, doc_id, v.to_bits())?;
411                }
412                _ => {}
413            }
414        }
415
416        // Stream document to disk immediately
417        self.write_document_to_store(&doc)?;
418
419        Ok(doc_id)
420    }
421
422    /// Index a text field using interned terms
423    ///
424    /// Optimization: aggregate term frequencies within the document first,
425    /// then do a single insert per unique term. This reduces hash lookups
426    /// from O(total_tokens) to O(unique_terms_in_doc).
427    fn index_text_field(&mut self, field: Field, doc_id: DocId, text: &str) -> Result<u32> {
428        let default_tokenizer = LowercaseTokenizer;
429        let tokenizer: &dyn crate::tokenizer::TokenizerClone = self
430            .tokenizers
431            .get(&field)
432            .map(|t| t.as_ref())
433            .unwrap_or(&default_tokenizer);
434
435        let tokens = tokenizer.tokenize(text);
436        let token_count = tokens.len() as u32;
437
438        // Phase 1: Aggregate term frequencies within this document
439        // Reuse buffer to avoid allocations
440        self.local_tf_buffer.clear();
441
442        for token in tokens {
443            // Intern the term - O(1) amortized
444            let term_spur = self.term_interner.get_or_intern(&token.text);
445            *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
446        }
447
448        // Phase 2: Insert aggregated terms into inverted index
449        // Now we only do one inverted_index lookup per unique term in doc
450        // Reuse buffer for terms to spill
451        let field_id = field.0;
452        self.terms_to_spill_buffer.clear();
453
454        for (&term_spur, &tf) in &self.local_tf_buffer {
455            let term_key = TermKey {
456                field: field_id,
457                term: term_spur,
458            };
459
460            let posting = self.inverted_index.get_or_insert(term_key);
461            posting.add(doc_id, tf);
462
463            // Mark for spilling if needed
464            if posting.needs_spill() {
465                self.terms_to_spill_buffer.push(term_key);
466            }
467        }
468
469        // Phase 3: Spill large posting lists
470        for i in 0..self.terms_to_spill_buffer.len() {
471            let term_key = self.terms_to_spill_buffer[i];
472            self.spill_posting_list(term_key)?;
473        }
474
475        Ok(token_count)
476    }
477
478    fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
479        // For numeric fields, we use a special encoding
480        let term_str = format!("__num_{}", value);
481        let term_spur = self.term_interner.get_or_intern(&term_str);
482
483        let term_key = TermKey {
484            field: field.0,
485            term: term_spur,
486        };
487
488        let posting = self.inverted_index.get_or_insert(term_key);
489        posting.add(doc_id, 1);
490
491        Ok(())
492    }
493
494    /// Write document to streaming store
495    fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
496        use byteorder::{LittleEndian, WriteBytesExt};
497
498        let doc_bytes = super::store::serialize_document(doc, &self.schema)?;
499
500        self.store_file
501            .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
502        self.store_file.write_all(&doc_bytes)?;
503
504        Ok(())
505    }
506
507    /// Spill a large posting list to disk
508    fn spill_posting_list(&mut self, term_key: TermKey) -> Result<()> {
509        use byteorder::{LittleEndian, WriteBytesExt};
510
511        let posting = self.inverted_index.get_mut(&term_key).unwrap();
512
513        // Initialize spill file if needed
514        if self.spill_file.is_none() {
515            let file = OpenOptions::new()
516                .create(true)
517                .write(true)
518                .read(true)
519                .truncate(true)
520                .open(&self.spill_path)?;
521            self.spill_file = Some(BufWriter::with_capacity(SPILL_BUFFER_SIZE, file));
522        }
523
524        let spill_file = self.spill_file.as_mut().unwrap();
525
526        // Record spill offset if this is first spill for this term
527        if posting.spill_offset < 0 {
528            posting.spill_offset = self.spill_offset as i64;
529        }
530
531        // Write postings to spill file
532        for p in &posting.memory {
533            spill_file.write_u32::<LittleEndian>(p.doc_id)?;
534            spill_file.write_u16::<LittleEndian>(p.term_freq)?;
535            self.spill_offset += 6; // 4 bytes doc_id + 2 bytes term_freq
536        }
537
538        posting.spill_count += posting.memory.len() as u32;
539        posting.memory.clear();
540        posting.memory.shrink_to(POSTING_FLUSH_THRESHOLD / 4); // Keep some capacity
541
542        Ok(())
543    }
544
545    /// Build the final segment
546    pub async fn build<D: Directory + DirectoryWriter>(
547        mut self,
548        dir: &D,
549        segment_id: SegmentId,
550    ) -> Result<SegmentMeta> {
551        // Flush any buffered data
552        self.store_file.flush()?;
553        if let Some(ref mut spill) = self.spill_file {
554            spill.flush()?;
555        }
556
557        let files = SegmentFiles::new(segment_id.0);
558
559        // Build term dictionary and postings
560        let (term_dict_data, postings_data) = self.build_postings()?;
561
562        // Build document store from streamed data
563        let store_data = self.build_store_from_stream()?;
564
565        // Write to directory
566        dir.write(&files.term_dict, &term_dict_data).await?;
567        dir.write(&files.postings, &postings_data).await?;
568        dir.write(&files.store, &store_data).await?;
569
570        let meta = SegmentMeta {
571            id: segment_id.0,
572            num_docs: self.next_doc_id,
573            field_stats: self.field_stats.clone(),
574        };
575
576        dir.write(&files.meta, &meta.serialize()?).await?;
577
578        // Cleanup temp files
579        let _ = std::fs::remove_file(&self.store_path);
580        let _ = std::fs::remove_file(&self.spill_path);
581
582        Ok(meta)
583    }
584
585    /// Build postings from inverted index
586    fn build_postings(&mut self) -> Result<(Vec<u8>, Vec<u8>)> {
587        use std::collections::BTreeMap;
588
589        // We need to sort terms for SSTable, so collect into BTreeMap
590        // Key format: field_id (4 bytes) + term bytes
591        let mut sorted_terms: BTreeMap<Vec<u8>, &SpillablePostingList> = BTreeMap::new();
592
593        for (term_key, posting_list) in &self.inverted_index {
594            let term_str = self.term_interner.resolve(&term_key.term);
595            let mut key = Vec::with_capacity(4 + term_str.len());
596            key.extend_from_slice(&term_key.field.to_le_bytes());
597            key.extend_from_slice(term_str.as_bytes());
598            sorted_terms.insert(key, posting_list);
599        }
600
601        let mut term_dict = Vec::new();
602        let mut postings = Vec::new();
603        let mut writer = SSTableWriter::<TermInfo>::new(&mut term_dict);
604
605        // Memory-map spill file if it exists
606        let spill_mmap = if self.spill_file.is_some() {
607            drop(self.spill_file.take()); // Close writer
608            let file = File::open(&self.spill_path)?;
609            Some(unsafe { memmap2::Mmap::map(&file)? })
610        } else {
611            None
612        };
613
614        for (key, spill_posting) in sorted_terms {
615            // Reconstruct full posting list
616            let mut full_postings = PostingList::with_capacity(spill_posting.total_count());
617
618            // Read spilled postings first (they come before in-memory ones)
619            if spill_posting.spill_offset >= 0
620                && let Some(ref mmap) = spill_mmap
621            {
622                let mut offset = spill_posting.spill_offset as usize;
623                for _ in 0..spill_posting.spill_count {
624                    let doc_id = u32::from_le_bytes([
625                        mmap[offset],
626                        mmap[offset + 1],
627                        mmap[offset + 2],
628                        mmap[offset + 3],
629                    ]);
630                    let term_freq = u16::from_le_bytes([mmap[offset + 4], mmap[offset + 5]]);
631                    full_postings.push(doc_id, term_freq as u32);
632                    offset += 6;
633                }
634            }
635
636            // Add in-memory postings
637            for p in &spill_posting.memory {
638                full_postings.push(p.doc_id, p.term_freq as u32);
639            }
640
641            // Build term info
642            let doc_ids: Vec<u32> = full_postings.iter().map(|p| p.doc_id).collect();
643            let term_freqs: Vec<u32> = full_postings.iter().map(|p| p.term_freq).collect();
644
645            let term_info = if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
646                inline
647            } else {
648                let posting_offset = postings.len() as u64;
649                let block_list =
650                    crate::structures::BlockPostingList::from_posting_list(&full_postings)?;
651                block_list.serialize(&mut postings)?;
652                TermInfo::external(
653                    posting_offset,
654                    (postings.len() as u64 - posting_offset) as u32,
655                    full_postings.doc_count(),
656                )
657            };
658
659            writer.insert(&key, &term_info)?;
660        }
661
662        writer.finish()?;
663        Ok((term_dict, postings))
664    }
665
666    /// Build document store from streamed temp file
667    fn build_store_from_stream(&mut self) -> Result<Vec<u8>> {
668        // Memory-map the temp store file
669        drop(std::mem::replace(
670            &mut self.store_file,
671            BufWriter::new(File::create("/dev/null")?),
672        ));
673
674        let file = File::open(&self.store_path)?;
675        let mmap = unsafe { memmap2::Mmap::map(&file)? };
676
677        // Re-compress with proper block structure
678        let mut store_data = Vec::new();
679        let mut store_writer =
680            StoreWriter::with_compression_level(&mut store_data, self.config.compression_level);
681
682        let mut offset = 0usize;
683        while offset < mmap.len() {
684            if offset + 4 > mmap.len() {
685                break;
686            }
687
688            let doc_len = u32::from_le_bytes([
689                mmap[offset],
690                mmap[offset + 1],
691                mmap[offset + 2],
692                mmap[offset + 3],
693            ]) as usize;
694            offset += 4;
695
696            if offset + doc_len > mmap.len() {
697                break;
698            }
699
700            let doc_bytes = &mmap[offset..offset + doc_len];
701            offset += doc_len;
702
703            // Deserialize and re-store with proper compression
704            if let Ok(doc) = super::store::deserialize_document(doc_bytes, &self.schema) {
705                store_writer.store(&doc, &self.schema)?;
706            }
707        }
708
709        store_writer.finish()?;
710        Ok(store_data)
711    }
712}
713
714#[cfg(feature = "native")]
715impl Drop for SegmentBuilder {
716    fn drop(&mut self) {
717        // Cleanup temp files on drop
718        let _ = std::fs::remove_file(&self.store_path);
719        let _ = std::fs::remove_file(&self.spill_path);
720    }
721}