Skip to main content

hermes_core/segment/
merger.rs

1//! Segment merger for combining multiple segments
2
3use std::cmp::Ordering;
4use std::collections::BinaryHeap;
5use std::io::Write;
6use std::sync::Arc;
7
8use rustc_hash::FxHashMap;
9
10use super::reader::SegmentReader;
11use super::store::StoreMerger;
12use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
13use crate::Result;
14use crate::directories::{Directory, DirectoryWriter, StreamingWriter};
15use crate::dsl::{FieldType, Schema};
16use crate::structures::{
17    BlockPostingList, PositionPostingList, PostingList, RaBitQConfig, RaBitQIndex, SSTableWriter,
18    TERMINATED, TermInfo,
19};
20
21/// Write adapter that tracks bytes written.
22///
23/// Concrete type so it works with generic `serialize<W: Write>` functions
24/// (unlike `dyn StreamingWriter` which isn't `Sized`).
25pub(crate) struct OffsetWriter {
26    inner: Box<dyn StreamingWriter>,
27    offset: u64,
28}
29
30impl OffsetWriter {
31    fn new(inner: Box<dyn StreamingWriter>) -> Self {
32        Self { inner, offset: 0 }
33    }
34
35    /// Current write position (total bytes written so far).
36    fn offset(&self) -> u64 {
37        self.offset
38    }
39
40    /// Finalize the underlying streaming writer.
41    fn finish(self) -> std::io::Result<()> {
42        self.inner.finish()
43    }
44}
45
46impl Write for OffsetWriter {
47    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
48        let n = self.inner.write(buf)?;
49        self.offset += n as u64;
50        Ok(n)
51    }
52
53    fn flush(&mut self) -> std::io::Result<()> {
54        self.inner.flush()
55    }
56}
57
58/// Format byte count as human-readable string
59fn format_bytes(bytes: usize) -> String {
60    if bytes >= 1024 * 1024 * 1024 {
61        format!("{:.2} GB", bytes as f64 / (1024.0 * 1024.0 * 1024.0))
62    } else if bytes >= 1024 * 1024 {
63        format!("{:.2} MB", bytes as f64 / (1024.0 * 1024.0))
64    } else if bytes >= 1024 {
65        format!("{:.2} KB", bytes as f64 / 1024.0)
66    } else {
67        format!("{} B", bytes)
68    }
69}
70
71/// Compute per-segment doc ID offsets (each segment's docs start after the previous)
72fn doc_offsets(segments: &[SegmentReader]) -> Vec<u32> {
73    let mut offsets = Vec::with_capacity(segments.len());
74    let mut acc = 0u32;
75    for seg in segments {
76        offsets.push(acc);
77        acc += seg.num_docs();
78    }
79    offsets
80}
81
82/// Statistics for merge operations
83#[derive(Debug, Clone, Default)]
84pub struct MergeStats {
85    /// Number of terms processed
86    pub terms_processed: usize,
87    /// Peak memory usage in bytes (estimated)
88    pub peak_memory_bytes: usize,
89    /// Current memory usage in bytes (estimated)
90    pub current_memory_bytes: usize,
91    /// Term dictionary output size
92    pub term_dict_bytes: usize,
93    /// Postings output size
94    pub postings_bytes: usize,
95    /// Store output size
96    pub store_bytes: usize,
97    /// Vector index output size
98    pub vectors_bytes: usize,
99    /// Sparse vector index output size
100    pub sparse_bytes: usize,
101}
102
103/// Entry for k-way merge heap
104struct MergeEntry {
105    key: Vec<u8>,
106    term_info: TermInfo,
107    segment_idx: usize,
108    doc_offset: u32,
109}
110
111impl PartialEq for MergeEntry {
112    fn eq(&self, other: &Self) -> bool {
113        self.key == other.key
114    }
115}
116
117impl Eq for MergeEntry {}
118
119impl PartialOrd for MergeEntry {
120    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
121        Some(self.cmp(other))
122    }
123}
124
125impl Ord for MergeEntry {
126    fn cmp(&self, other: &Self) -> Ordering {
127        // Reverse order for min-heap (BinaryHeap is max-heap by default)
128        other.key.cmp(&self.key)
129    }
130}
131
132/// Trained vector index structures for rebuilding segments with ANN indexes
133pub struct TrainedVectorStructures {
134    /// Trained centroids per field_id
135    pub centroids: rustc_hash::FxHashMap<u32, Arc<crate::structures::CoarseCentroids>>,
136    /// Trained PQ codebooks per field_id (for ScaNN)
137    pub codebooks: rustc_hash::FxHashMap<u32, Arc<crate::structures::PQCodebook>>,
138}
139
140/// Strategy for handling dense vector indexes during merge
141pub enum DenseVectorStrategy<'a> {
142    /// Merge existing indexes (ScaNN/IVF cluster merge or RaBitQ rebuild)
143    MergeExisting,
144    /// Build ANN indexes from flat vectors using trained structures
145    BuildAnn(&'a TrainedVectorStructures),
146}
147
148/// Segment merger - merges multiple segments into one
149pub struct SegmentMerger {
150    schema: Arc<Schema>,
151}
152
153impl SegmentMerger {
154    pub fn new(schema: Arc<Schema>) -> Self {
155        Self { schema }
156    }
157
158    /// Merge segments into one, streaming postings/positions/store directly to files.
159    pub async fn merge<D: Directory + DirectoryWriter>(
160        &self,
161        dir: &D,
162        segments: &[SegmentReader],
163        new_segment_id: SegmentId,
164    ) -> Result<(SegmentMeta, MergeStats)> {
165        self.merge_core(
166            dir,
167            segments,
168            new_segment_id,
169            DenseVectorStrategy::MergeExisting,
170        )
171        .await
172    }
173
174    /// Core merge: handles all mandatory parts (postings, positions, store, sparse, field stats, meta)
175    /// and delegates dense vector handling to the provided strategy.
176    ///
177    /// Uses streaming writers so postings, positions, and store data flow directly
178    /// to files instead of buffering everything in memory. Only the term dictionary
179    /// (compact key+TermInfo entries) is buffered.
180    async fn merge_core<D: Directory + DirectoryWriter>(
181        &self,
182        dir: &D,
183        segments: &[SegmentReader],
184        new_segment_id: SegmentId,
185        dense_strategy: DenseVectorStrategy<'_>,
186    ) -> Result<(SegmentMeta, MergeStats)> {
187        let mut stats = MergeStats::default();
188        let files = SegmentFiles::new(new_segment_id.0);
189
190        // === Phase 1: merge postings + positions (streaming) ===
191        let mut postings_writer = OffsetWriter::new(dir.streaming_writer(&files.postings).await?);
192        let mut positions_writer = OffsetWriter::new(dir.streaming_writer(&files.positions).await?);
193        let mut term_dict_writer = OffsetWriter::new(dir.streaming_writer(&files.term_dict).await?);
194
195        let terms_processed = self
196            .merge_postings(
197                segments,
198                &mut term_dict_writer,
199                &mut postings_writer,
200                &mut positions_writer,
201                &mut stats,
202            )
203            .await?;
204        stats.terms_processed = terms_processed;
205        stats.postings_bytes = postings_writer.offset() as usize;
206        stats.term_dict_bytes = term_dict_writer.offset() as usize;
207        let positions_bytes = positions_writer.offset();
208
209        postings_writer.finish()?;
210        term_dict_writer.finish()?;
211        if positions_bytes > 0 {
212            positions_writer.finish()?;
213        } else {
214            drop(positions_writer);
215            let _ = dir.delete(&files.positions).await;
216        }
217
218        // === Phase 2: merge store files (streaming) ===
219        {
220            let mut store_writer = OffsetWriter::new(dir.streaming_writer(&files.store).await?);
221            {
222                let mut store_merger = StoreMerger::new(&mut store_writer);
223                for segment in segments {
224                    if segment.store_has_dict() {
225                        store_merger
226                            .append_store_recompressing(segment.store())
227                            .await
228                            .map_err(crate::Error::Io)?;
229                    } else {
230                        let raw_blocks = segment.store_raw_blocks();
231                        let data_slice = segment.store_data_slice();
232                        store_merger.append_store(data_slice, &raw_blocks).await?;
233                    }
234                }
235                store_merger.finish()?;
236            }
237            stats.store_bytes = store_writer.offset() as usize;
238            store_writer.finish()?;
239        }
240
241        // === Dense vectors: strategy-dependent ===
242        let vectors_bytes = match &dense_strategy {
243            DenseVectorStrategy::MergeExisting => {
244                self.merge_dense_vectors(dir, segments, &files).await?
245            }
246            DenseVectorStrategy::BuildAnn(trained) => {
247                self.build_ann_vectors(dir, segments, &files, trained)
248                    .await?
249            }
250        };
251        stats.vectors_bytes = vectors_bytes;
252
253        // === Mandatory: merge sparse vectors ===
254        let sparse_bytes = self.merge_sparse_vectors(dir, segments, &files).await?;
255        stats.sparse_bytes = sparse_bytes;
256
257        // === Mandatory: merge field stats + write meta ===
258        let mut merged_field_stats: FxHashMap<u32, FieldStats> = FxHashMap::default();
259        for segment in segments {
260            for (&field_id, field_stats) in &segment.meta().field_stats {
261                let entry = merged_field_stats.entry(field_id).or_default();
262                entry.total_tokens += field_stats.total_tokens;
263                entry.doc_count += field_stats.doc_count;
264            }
265        }
266
267        let total_docs: u32 = segments.iter().map(|s| s.num_docs()).sum();
268        let meta = SegmentMeta {
269            id: new_segment_id.0,
270            num_docs: total_docs,
271            field_stats: merged_field_stats,
272        };
273
274        dir.write(&files.meta, &meta.serialize()?).await?;
275
276        let label = match &dense_strategy {
277            DenseVectorStrategy::MergeExisting => "Merge",
278            DenseVectorStrategy::BuildAnn(_) => "ANN merge",
279        };
280        log::info!(
281            "{} complete: {} docs, {} terms, term_dict={}, postings={}, store={}, vectors={}, sparse={}",
282            label,
283            total_docs,
284            stats.terms_processed,
285            format_bytes(stats.term_dict_bytes),
286            format_bytes(stats.postings_bytes),
287            format_bytes(stats.store_bytes),
288            format_bytes(stats.vectors_bytes),
289            format_bytes(stats.sparse_bytes),
290        );
291
292        Ok((meta, stats))
293    }
294
295    /// Merge postings from multiple segments using streaming k-way merge
296    ///
297    /// This implementation uses a min-heap to merge terms from all segments
298    /// in sorted order without loading all terms into memory at once.
299    /// Memory usage is O(num_segments) instead of O(total_terms).
300    ///
301    /// Optimization: For terms that exist in only one segment, we copy the
302    /// posting data directly without decode/encode. Only terms that exist
303    /// in multiple segments need full merge.
304    ///
305    /// Returns the number of terms processed.
306    async fn merge_postings(
307        &self,
308        segments: &[SegmentReader],
309        term_dict: &mut OffsetWriter,
310        postings_out: &mut OffsetWriter,
311        positions_out: &mut OffsetWriter,
312        stats: &mut MergeStats,
313    ) -> Result<usize> {
314        let doc_offs = doc_offsets(segments);
315
316        // Bulk-prefetch all term dict blocks (1 I/O per segment instead of ~160)
317        for (i, segment) in segments.iter().enumerate() {
318            log::debug!("Prefetching term dict for segment {} ...", i);
319            segment.prefetch_term_dict().await?;
320        }
321
322        // Create iterators for each segment's term dictionary
323        let mut iterators: Vec<_> = segments.iter().map(|s| s.term_dict_iter()).collect();
324
325        // Initialize min-heap with first entry from each segment
326        let mut heap: BinaryHeap<MergeEntry> = BinaryHeap::new();
327        for (seg_idx, iter) in iterators.iter_mut().enumerate() {
328            if let Some((key, term_info)) = iter.next().await.map_err(crate::Error::from)? {
329                heap.push(MergeEntry {
330                    key,
331                    term_info,
332                    segment_idx: seg_idx,
333                    doc_offset: doc_offs[seg_idx],
334                });
335            }
336        }
337
338        // Buffer term results - needed because SSTableWriter can't be held across await points
339        // Memory is bounded by unique terms (typically much smaller than postings)
340        let mut term_results: Vec<(Vec<u8>, TermInfo)> = Vec::new();
341        let mut terms_processed = 0usize;
342
343        while !heap.is_empty() {
344            // Get the smallest key
345            let first = heap.pop().unwrap();
346            let current_key = first.key.clone();
347
348            // Collect all entries with the same key
349            let mut sources: Vec<(usize, TermInfo, u32)> =
350                vec![(first.segment_idx, first.term_info, first.doc_offset)];
351
352            // Advance the iterator that provided this entry
353            if let Some((key, term_info)) = iterators[first.segment_idx]
354                .next()
355                .await
356                .map_err(crate::Error::from)?
357            {
358                heap.push(MergeEntry {
359                    key,
360                    term_info,
361                    segment_idx: first.segment_idx,
362                    doc_offset: doc_offs[first.segment_idx],
363                });
364            }
365
366            // Check if other segments have the same key
367            while let Some(entry) = heap.peek() {
368                if entry.key != current_key {
369                    break;
370                }
371                let entry = heap.pop().unwrap();
372                sources.push((entry.segment_idx, entry.term_info, entry.doc_offset));
373
374                // Advance this iterator too
375                if let Some((key, term_info)) = iterators[entry.segment_idx]
376                    .next()
377                    .await
378                    .map_err(crate::Error::from)?
379                {
380                    heap.push(MergeEntry {
381                        key,
382                        term_info,
383                        segment_idx: entry.segment_idx,
384                        doc_offset: doc_offs[entry.segment_idx],
385                    });
386                }
387            }
388
389            // Process this term (handles both single-source and multi-source)
390            let term_info = self
391                .merge_term(segments, &sources, postings_out, positions_out)
392                .await?;
393
394            term_results.push((current_key, term_info));
395            terms_processed += 1;
396
397            // Log progress every 100k terms
398            if terms_processed.is_multiple_of(100_000) {
399                log::debug!("Merge progress: {} terms processed", terms_processed);
400            }
401        }
402
403        // Track memory (only term_results is buffered; postings/positions stream to disk)
404        let results_mem = term_results.capacity() * std::mem::size_of::<(Vec<u8>, TermInfo)>();
405        stats.current_memory_bytes = results_mem;
406        stats.peak_memory_bytes = stats.peak_memory_bytes.max(stats.current_memory_bytes);
407
408        log::info!(
409            "[merge] complete: terms={}, segments={}, term_buffer={:.2} MB, postings={}, positions={}",
410            terms_processed,
411            segments.len(),
412            results_mem as f64 / (1024.0 * 1024.0),
413            format_bytes(postings_out.offset() as usize),
414            format_bytes(positions_out.offset() as usize),
415        );
416
417        // Write to SSTable (sync, no await points)
418        let mut writer = SSTableWriter::<TermInfo>::new(term_dict);
419        for (key, term_info) in term_results {
420            writer.insert(&key, &term_info)?;
421        }
422        writer.finish()?;
423
424        Ok(terms_processed)
425    }
426
427    /// Merge a single term's postings + positions from one or more source segments.
428    ///
429    /// Fast path: when all sources are external and there are multiple,
430    /// uses block-level concatenation (O(blocks) instead of O(postings)).
431    /// Otherwise: full decode → remap doc IDs → re-encode.
432    async fn merge_term(
433        &self,
434        segments: &[SegmentReader],
435        sources: &[(usize, TermInfo, u32)],
436        postings_out: &mut OffsetWriter,
437        positions_out: &mut OffsetWriter,
438    ) -> Result<TermInfo> {
439        let mut sorted: Vec<_> = sources.to_vec();
440        sorted.sort_by_key(|(_, _, off)| *off);
441
442        let any_positions = sorted.iter().any(|(_, ti, _)| ti.position_info().is_some());
443        let all_external = sorted.iter().all(|(_, ti, _)| ti.external_info().is_some());
444
445        // === Merge postings ===
446        let (posting_offset, posting_len, doc_count) = if all_external && sorted.len() > 1 {
447            // Fast path: block-level concatenation
448            let mut block_sources = Vec::with_capacity(sorted.len());
449            for (seg_idx, ti, doc_off) in &sorted {
450                let (off, len) = ti.external_info().unwrap();
451                let bytes = segments[*seg_idx].read_postings(off, len).await?;
452                let bpl = BlockPostingList::deserialize(&mut bytes.as_slice())?;
453                block_sources.push((bpl, *doc_off));
454            }
455            let merged = BlockPostingList::concatenate_blocks(&block_sources)?;
456            let offset = postings_out.offset();
457            let mut buf = Vec::new();
458            merged.serialize(&mut buf)?;
459            postings_out.write_all(&buf)?;
460            (offset, buf.len() as u32, merged.doc_count())
461        } else {
462            // Decode all sources into a flat PostingList, remap doc IDs
463            let mut merged = PostingList::new();
464            for (seg_idx, ti, doc_off) in &sorted {
465                if let Some((ids, tfs)) = ti.decode_inline() {
466                    for (id, tf) in ids.into_iter().zip(tfs) {
467                        merged.add(id + doc_off, tf);
468                    }
469                } else {
470                    let (off, len) = ti.external_info().unwrap();
471                    let bytes = segments[*seg_idx].read_postings(off, len).await?;
472                    let bpl = BlockPostingList::deserialize(&mut bytes.as_slice())?;
473                    let mut it = bpl.iterator();
474                    while it.doc() != TERMINATED {
475                        merged.add(it.doc() + doc_off, it.term_freq());
476                        it.advance();
477                    }
478                }
479            }
480            // Try to inline (only when no positions)
481            if !any_positions {
482                let ids: Vec<u32> = merged.iter().map(|p| p.doc_id).collect();
483                let tfs: Vec<u32> = merged.iter().map(|p| p.term_freq).collect();
484                if let Some(inline) = TermInfo::try_inline(&ids, &tfs) {
485                    return Ok(inline);
486                }
487            }
488            let offset = postings_out.offset();
489            let block = BlockPostingList::from_posting_list(&merged)?;
490            let mut buf = Vec::new();
491            block.serialize(&mut buf)?;
492            postings_out.write_all(&buf)?;
493            (offset, buf.len() as u32, merged.doc_count())
494        };
495
496        // === Merge positions (if any source has them) ===
497        if any_positions {
498            let mut pos_sources = Vec::new();
499            for (seg_idx, ti, doc_off) in &sorted {
500                if let Some((pos_off, pos_len)) = ti.position_info()
501                    && let Some(bytes) = segments[*seg_idx]
502                        .read_position_bytes(pos_off, pos_len)
503                        .await?
504                {
505                    let pl = PositionPostingList::deserialize(&mut bytes.as_slice())
506                        .map_err(crate::Error::Io)?;
507                    pos_sources.push((pl, *doc_off));
508                }
509            }
510            if !pos_sources.is_empty() {
511                let merged = PositionPostingList::concatenate_blocks(&pos_sources)
512                    .map_err(crate::Error::Io)?;
513                let offset = positions_out.offset();
514                let mut buf = Vec::new();
515                merged.serialize(&mut buf).map_err(crate::Error::Io)?;
516                positions_out.write_all(&buf)?;
517                return Ok(TermInfo::external_with_positions(
518                    posting_offset,
519                    posting_len,
520                    doc_count,
521                    offset,
522                    buf.len() as u32,
523                ));
524            }
525        }
526
527        Ok(TermInfo::external(posting_offset, posting_len, doc_count))
528    }
529
530    /// Merge dense vector indexes - returns output size in bytes
531    ///
532    /// For ScaNN (IVF-PQ): O(1) merge by concatenating cluster data (same codebook)
533    /// For IVF-RaBitQ: O(1) merge by concatenating cluster data (same centroids)
534    /// For RaBitQ: Must rebuild with new centroid from all vectors
535    async fn merge_dense_vectors<D: Directory + DirectoryWriter>(
536        &self,
537        dir: &D,
538        segments: &[SegmentReader],
539        files: &SegmentFiles,
540    ) -> Result<usize> {
541        let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
542
543        for (field, entry) in self.schema.fields() {
544            if !matches!(entry.field_type, FieldType::DenseVector) {
545                continue;
546            }
547
548            // Check if all segments have ScaNN indexes for this field
549            let scann_indexes: Vec<_> = segments
550                .iter()
551                .filter_map(|s| s.get_scann_vector_index(field))
552                .collect();
553
554            if scann_indexes.len()
555                == segments
556                    .iter()
557                    .filter(|s| s.has_dense_vector_index(field))
558                    .count()
559                && !scann_indexes.is_empty()
560            {
561                // All segments have ScaNN - use O(1) cluster merge!
562                let refs: Vec<&crate::structures::IVFPQIndex> =
563                    scann_indexes.iter().map(|(idx, _)| idx.as_ref()).collect();
564
565                let doc_offs = doc_offsets(segments);
566
567                match crate::structures::IVFPQIndex::merge(&refs, &doc_offs) {
568                    Ok(merged) => {
569                        let bytes = merged
570                            .to_bytes()
571                            .map_err(|e| crate::Error::Serialization(e.to_string()))?;
572                        field_indexes.push((field.0, 2u8, bytes)); // 2 = ScaNN
573                        continue;
574                    }
575                    Err(e) => {
576                        log::warn!("ScaNN merge failed: {}, falling back to IVF", e);
577                    }
578                }
579            }
580
581            // Check if all segments have IVF indexes for this field
582            let ivf_indexes: Vec<_> = segments
583                .iter()
584                .filter_map(|s| s.get_ivf_vector_index(field))
585                .collect();
586
587            if ivf_indexes.len()
588                == segments
589                    .iter()
590                    .filter(|s| s.has_dense_vector_index(field))
591                    .count()
592                && !ivf_indexes.is_empty()
593            {
594                // All segments have IVF - use O(1) cluster merge!
595                let refs: Vec<&crate::structures::IVFRaBitQIndex> =
596                    ivf_indexes.iter().map(|arc| arc.as_ref()).collect();
597
598                let doc_offs = doc_offsets(segments);
599
600                match crate::structures::IVFRaBitQIndex::merge(&refs, &doc_offs) {
601                    Ok(merged) => {
602                        let bytes = merged
603                            .to_bytes()
604                            .map_err(|e| crate::Error::Serialization(e.to_string()))?;
605                        field_indexes.push((field.0, 1u8, bytes)); // 1 = IVF-RaBitQ
606                        continue;
607                    }
608                    Err(e) => {
609                        log::warn!("IVF merge failed: {}, falling back to rebuild", e);
610                    }
611                }
612            }
613
614            // Fall back to RaBitQ rebuild (collect raw vectors)
615            let mut all_vectors: Vec<Vec<f32>> = Vec::new();
616
617            for segment in segments {
618                if let Some(index) = segment.get_dense_vector_index(field)
619                    && let Some(raw_vecs) = &index.raw_vectors
620                {
621                    all_vectors.extend(raw_vecs.iter().cloned());
622                }
623            }
624
625            if !all_vectors.is_empty() {
626                let dim = all_vectors[0].len();
627                let config = RaBitQConfig::new(dim);
628                let merged_index = RaBitQIndex::build(config, &all_vectors, true);
629
630                let index_bytes = serde_json::to_vec(&merged_index)
631                    .map_err(|e| crate::Error::Serialization(e.to_string()))?;
632
633                field_indexes.push((field.0, 0u8, index_bytes)); // 0 = RaBitQ
634            }
635        }
636
637        write_vector_file(dir, files, field_indexes).await
638    }
639
640    /// Merge sparse vector indexes using block stacking
641    ///
642    /// This is O(blocks) instead of O(postings) - we stack blocks directly
643    /// and only adjust the first_doc_id in each block header by the doc offset.
644    /// Deltas within blocks remain unchanged since they're relative.
645    async fn merge_sparse_vectors<D: Directory + DirectoryWriter>(
646        &self,
647        dir: &D,
648        segments: &[SegmentReader],
649        files: &SegmentFiles,
650    ) -> Result<usize> {
651        use crate::structures::BlockSparsePostingList;
652        use byteorder::{LittleEndian, WriteBytesExt};
653
654        let doc_offs = doc_offsets(segments);
655        for (i, seg) in segments.iter().enumerate() {
656            log::debug!(
657                "Sparse merge: segment {} has {} docs, doc_offset={}",
658                i,
659                seg.num_docs(),
660                doc_offs[i]
661            );
662        }
663
664        // Collect all sparse vector fields from schema
665        let sparse_fields: Vec<_> = self
666            .schema
667            .fields()
668            .filter(|(_, entry)| matches!(entry.field_type, FieldType::SparseVector))
669            .map(|(field, entry)| (field, entry.sparse_vector_config.clone()))
670            .collect();
671
672        if sparse_fields.is_empty() {
673            return Ok(0);
674        }
675
676        // Collect field data: (field_id, quantization, max_dim_id, dim_id -> merged_posting_list)
677        type SparseFieldData = (
678            u32,
679            crate::structures::WeightQuantization,
680            u32,
681            FxHashMap<u32, Vec<u8>>,
682        );
683        let mut field_data: Vec<SparseFieldData> = Vec::new();
684
685        for (field, sparse_config) in &sparse_fields {
686            // Get quantization from config
687            let quantization = sparse_config
688                .as_ref()
689                .map(|c| c.weight_quantization)
690                .unwrap_or(crate::structures::WeightQuantization::Float32);
691
692            // Collect all dimensions across all segments for this field
693            let mut all_dims: rustc_hash::FxHashSet<u32> = rustc_hash::FxHashSet::default();
694            for segment in segments {
695                if let Some(sparse_index) = segment.sparse_indexes().get(&field.0) {
696                    for dim_id in sparse_index.active_dimensions() {
697                        all_dims.insert(dim_id);
698                    }
699                }
700            }
701
702            if all_dims.is_empty() {
703                continue;
704            }
705
706            // Bulk-read ALL posting lists per segment in one I/O call each.
707            // This replaces 80K+ individual get_posting() calls with ~4 bulk reads.
708            let mut segment_postings: Vec<FxHashMap<u32, Arc<BlockSparsePostingList>>> =
709                Vec::with_capacity(segments.len());
710            for (seg_idx, segment) in segments.iter().enumerate() {
711                if let Some(sparse_index) = segment.sparse_indexes().get(&field.0) {
712                    log::debug!(
713                        "Sparse merge field {}: bulk-reading {} dims from segment {}",
714                        field.0,
715                        sparse_index.num_dimensions(),
716                        seg_idx
717                    );
718                    let postings = sparse_index.read_all_postings_bulk().await?;
719                    segment_postings.push(postings);
720                } else {
721                    segment_postings.push(FxHashMap::default());
722                }
723            }
724
725            let mut dim_bytes: FxHashMap<u32, Vec<u8>> = FxHashMap::default();
726
727            // Merge from in-memory data — no I/O in this loop
728            for dim_id in all_dims {
729                let mut posting_arcs: Vec<(Arc<BlockSparsePostingList>, u32)> = Vec::new();
730
731                for (seg_idx, postings) in segment_postings.iter().enumerate() {
732                    if let Some(posting_list) = postings.get(&dim_id) {
733                        posting_arcs.push((Arc::clone(posting_list), doc_offs[seg_idx]));
734                    }
735                }
736
737                if posting_arcs.is_empty() {
738                    continue;
739                }
740
741                let lists_with_offsets: Vec<(&BlockSparsePostingList, u32)> = posting_arcs
742                    .iter()
743                    .map(|(pl, offset)| (pl.as_ref(), *offset))
744                    .collect();
745
746                let merged = BlockSparsePostingList::merge_with_offsets(&lists_with_offsets);
747
748                let mut bytes = Vec::new();
749                merged.serialize(&mut bytes).map_err(crate::Error::Io)?;
750                dim_bytes.insert(dim_id, bytes);
751            }
752
753            // Drop bulk data before accumulating output
754            drop(segment_postings);
755
756            // Store num_dims (active count) instead of max_dim_id
757            field_data.push((field.0, quantization, dim_bytes.len() as u32, dim_bytes));
758        }
759
760        if field_data.is_empty() {
761            return Ok(0);
762        }
763
764        // Sort by field_id
765        field_data.sort_by_key(|(id, _, _, _)| *id);
766
767        // Compute header size and per-dimension offsets before writing
768        // Header: num_fields (4)
769        // Per field: field_id (4) + quant (1) + num_dims (4) + table (16 * num_dims)
770        let mut header_size = 4u64;
771        for (_, _, num_dims, _) in &field_data {
772            header_size += 4 + 1 + 4;
773            header_size += (*num_dims as u64) * 16;
774        }
775
776        // Pre-compute offset tables (small — just dim_id + offset + length per dim)
777        let mut current_offset = header_size;
778        let mut field_tables: Vec<Vec<(u32, u64, u32)>> = Vec::new();
779        for (_, _, _, dim_bytes) in &field_data {
780            let mut table: Vec<(u32, u64, u32)> = Vec::with_capacity(dim_bytes.len());
781            let mut dims: Vec<_> = dim_bytes.keys().copied().collect();
782            dims.sort();
783            for dim_id in dims {
784                let bytes = &dim_bytes[&dim_id];
785                table.push((dim_id, current_offset, bytes.len() as u32));
786                current_offset += bytes.len() as u64;
787            }
788            field_tables.push(table);
789        }
790
791        // Stream header + tables + data directly to disk
792        let mut writer = OffsetWriter::new(dir.streaming_writer(&files.sparse).await?);
793
794        writer.write_u32::<LittleEndian>(field_data.len() as u32)?;
795        for (i, (field_id, quantization, num_dims, _)) in field_data.iter().enumerate() {
796            writer.write_u32::<LittleEndian>(*field_id)?;
797            writer.write_u8(*quantization as u8)?;
798            writer.write_u32::<LittleEndian>(*num_dims)?;
799            for &(dim_id, offset, length) in &field_tables[i] {
800                writer.write_u32::<LittleEndian>(dim_id)?;
801                writer.write_u64::<LittleEndian>(offset)?;
802                writer.write_u32::<LittleEndian>(length)?;
803            }
804        }
805
806        // Stream posting data per-field, per-dimension (drop each after writing)
807        for (_, _, _, dim_bytes) in field_data {
808            let mut dims: Vec<_> = dim_bytes.keys().copied().collect();
809            dims.sort();
810            for dim_id in dims {
811                writer.write_all(&dim_bytes[&dim_id])?;
812            }
813        }
814
815        let output_size = writer.offset() as usize;
816        writer.finish()?;
817
818        log::info!(
819            "Sparse vector merge complete: {} fields, {} bytes",
820            field_tables.len(),
821            output_size
822        );
823
824        Ok(output_size)
825    }
826
827    /// Merge segments and rebuild dense vectors with ANN indexes using trained structures.
828    pub async fn merge_with_ann<D: Directory + DirectoryWriter>(
829        &self,
830        dir: &D,
831        segments: &[SegmentReader],
832        new_segment_id: SegmentId,
833        trained: &TrainedVectorStructures,
834    ) -> Result<(SegmentMeta, MergeStats)> {
835        self.merge_core(
836            dir,
837            segments,
838            new_segment_id,
839            DenseVectorStrategy::BuildAnn(trained),
840        )
841        .await
842    }
843
844    /// Build ANN indexes from Flat vectors using trained centroids
845    async fn build_ann_vectors<D: Directory + DirectoryWriter>(
846        &self,
847        dir: &D,
848        segments: &[SegmentReader],
849        files: &SegmentFiles,
850        trained: &TrainedVectorStructures,
851    ) -> Result<usize> {
852        use crate::dsl::VectorIndexType;
853
854        let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
855
856        for (field, entry) in self.schema.fields() {
857            if !matches!(entry.field_type, FieldType::DenseVector) || !entry.indexed {
858                continue;
859            }
860
861            let config = match &entry.dense_vector_config {
862                Some(c) => c,
863                None => continue,
864            };
865
866            // Collect all Flat vectors from segments
867            let mut all_vectors: Vec<Vec<f32>> = Vec::new();
868            let mut all_doc_ids: Vec<(u32, u16)> = Vec::new();
869            let mut doc_offset = 0u32;
870
871            for segment in segments {
872                if let Some(super::VectorIndex::Flat(flat_data)) =
873                    segment.vector_indexes().get(&field.0)
874                {
875                    for (vec, (local_doc_id, ordinal)) in
876                        flat_data.vectors.iter().zip(flat_data.doc_ids.iter())
877                    {
878                        all_vectors.push(vec.clone());
879                        all_doc_ids.push((doc_offset + local_doc_id, *ordinal));
880                    }
881                }
882                doc_offset += segment.num_docs();
883            }
884
885            if all_vectors.is_empty() {
886                continue;
887            }
888
889            let dim = config.index_dim();
890
891            // Extract just doc_ids for ANN indexes (they don't track ordinals yet)
892            let ann_doc_ids: Vec<u32> = all_doc_ids.iter().map(|(doc_id, _)| *doc_id).collect();
893
894            // Build ANN index based on index type and available trained structures
895            match config.index_type {
896                VectorIndexType::IvfRaBitQ => {
897                    if let Some(centroids) = trained.centroids.get(&field.0) {
898                        // Create RaBitQ codebook for the dimension
899                        let rabitq_config = crate::structures::RaBitQConfig::new(dim);
900                        let codebook = crate::structures::RaBitQCodebook::new(rabitq_config);
901
902                        // Build IVF-RaBitQ index
903                        let ivf_config = crate::structures::IVFRaBitQConfig::new(dim)
904                            .with_store_raw(config.store_raw);
905                        let ivf_index = crate::structures::IVFRaBitQIndex::build(
906                            ivf_config,
907                            centroids,
908                            &codebook,
909                            &all_vectors,
910                            Some(&ann_doc_ids),
911                        );
912
913                        let index_data = super::builder::IVFRaBitQIndexData {
914                            centroids: (**centroids).clone(),
915                            codebook,
916                            index: ivf_index,
917                        };
918                        let bytes = index_data
919                            .to_bytes()
920                            .map_err(|e| crate::Error::Serialization(e.to_string()))?;
921                        field_indexes.push((field.0, 1u8, bytes)); // 1 = IVF-RaBitQ
922
923                        log::info!(
924                            "Built IVF-RaBitQ index for field {} with {} vectors",
925                            field.0,
926                            all_vectors.len()
927                        );
928                        continue;
929                    }
930                }
931                VectorIndexType::ScaNN => {
932                    if let (Some(centroids), Some(codebook)) = (
933                        trained.centroids.get(&field.0),
934                        trained.codebooks.get(&field.0),
935                    ) {
936                        // Build ScaNN (IVF-PQ) index
937                        let ivf_pq_config = crate::structures::IVFPQConfig::new(dim);
938                        let ivf_pq_index = crate::structures::IVFPQIndex::build(
939                            ivf_pq_config,
940                            centroids,
941                            codebook,
942                            &all_vectors,
943                            Some(&ann_doc_ids),
944                        );
945
946                        let index_data = super::builder::ScaNNIndexData {
947                            centroids: (**centroids).clone(),
948                            codebook: (**codebook).clone(),
949                            index: ivf_pq_index,
950                        };
951                        let bytes = index_data
952                            .to_bytes()
953                            .map_err(|e| crate::Error::Serialization(e.to_string()))?;
954                        field_indexes.push((field.0, 2u8, bytes)); // 2 = ScaNN
955
956                        log::info!(
957                            "Built ScaNN index for field {} with {} vectors",
958                            field.0,
959                            all_vectors.len()
960                        );
961                        continue;
962                    }
963                }
964                _ => {}
965            }
966
967            // Fallback: keep as Flat if no trained structures available
968            let flat_data = super::builder::FlatVectorData {
969                dim,
970                vectors: all_vectors,
971                doc_ids: all_doc_ids,
972            };
973            let bytes = flat_data.to_binary_bytes();
974            field_indexes.push((field.0, 4u8, bytes)); // 4 = Flat Binary
975        }
976
977        write_vector_file(dir, files, field_indexes).await
978    }
979}
980
981/// Write a vector index file with per-field header + data.
982/// Streams header then each field's data directly to disk, avoiding a single
983/// giant concatenation buffer.
984async fn write_vector_file<D: Directory + DirectoryWriter>(
985    dir: &D,
986    files: &SegmentFiles,
987    mut field_indexes: Vec<(u32, u8, Vec<u8>)>,
988) -> Result<usize> {
989    use byteorder::{LittleEndian, WriteBytesExt};
990
991    if field_indexes.is_empty() {
992        return Ok(0);
993    }
994
995    field_indexes.sort_by_key(|(id, _, _)| *id);
996
997    let mut writer = OffsetWriter::new(dir.streaming_writer(&files.vectors).await?);
998
999    // Header: num_fields + (field_id, index_type, offset, len) per field
1000    let header_size = 4 + field_indexes.len() * (4 + 1 + 8 + 8);
1001    writer.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
1002
1003    let mut current_offset = header_size as u64;
1004    for (field_id, index_type, data) in &field_indexes {
1005        writer.write_u32::<LittleEndian>(*field_id)?;
1006        writer.write_u8(*index_type)?;
1007        writer.write_u64::<LittleEndian>(current_offset)?;
1008        writer.write_u64::<LittleEndian>(data.len() as u64)?;
1009        current_offset += data.len() as u64;
1010    }
1011
1012    // Stream each field's data (drop after writing)
1013    for (_, _, data) in field_indexes {
1014        writer.write_all(&data)?;
1015    }
1016
1017    let output_size = writer.offset() as usize;
1018    writer.finish()?;
1019    Ok(output_size)
1020}
1021
1022/// Delete segment files from directory
1023pub async fn delete_segment<D: Directory + DirectoryWriter>(
1024    dir: &D,
1025    segment_id: SegmentId,
1026) -> Result<()> {
1027    let files = SegmentFiles::new(segment_id.0);
1028    let _ = dir.delete(&files.term_dict).await;
1029    let _ = dir.delete(&files.postings).await;
1030    let _ = dir.delete(&files.store).await;
1031    let _ = dir.delete(&files.meta).await;
1032    let _ = dir.delete(&files.vectors).await;
1033    Ok(())
1034}