Skip to main content

hermes_core/segment/
merger.rs

1//! Segment merger for combining multiple segments
2
3use std::cmp::Ordering;
4use std::collections::BinaryHeap;
5use std::io::Write;
6use std::mem::size_of;
7use std::sync::Arc;
8
9use rustc_hash::FxHashMap;
10
11use super::reader::SegmentReader;
12use super::store::StoreMerger;
13use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
14use crate::Result;
15use crate::directories::{Directory, DirectoryWriter, StreamingWriter};
16use crate::dsl::{FieldType, Schema};
17use crate::structures::{
18    BlockPostingList, PositionPostingList, PostingList, SSTableWriter, TERMINATED, TermInfo,
19};
20
21/// Write adapter that tracks bytes written.
22///
23/// Concrete type so it works with generic `serialize<W: Write>` functions
24/// (unlike `dyn StreamingWriter` which isn't `Sized`).
25pub(crate) struct OffsetWriter {
26    inner: Box<dyn StreamingWriter>,
27    offset: u64,
28}
29
30impl OffsetWriter {
31    fn new(inner: Box<dyn StreamingWriter>) -> Self {
32        Self { inner, offset: 0 }
33    }
34
35    /// Current write position (total bytes written so far).
36    fn offset(&self) -> u64 {
37        self.offset
38    }
39
40    /// Finalize the underlying streaming writer.
41    fn finish(self) -> std::io::Result<()> {
42        self.inner.finish()
43    }
44}
45
46impl Write for OffsetWriter {
47    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
48        let n = self.inner.write(buf)?;
49        self.offset += n as u64;
50        Ok(n)
51    }
52
53    fn flush(&mut self) -> std::io::Result<()> {
54        self.inner.flush()
55    }
56}
57
58/// Format byte count as human-readable string
59fn format_bytes(bytes: usize) -> String {
60    if bytes >= 1024 * 1024 * 1024 {
61        format!("{:.2} GB", bytes as f64 / (1024.0 * 1024.0 * 1024.0))
62    } else if bytes >= 1024 * 1024 {
63        format!("{:.2} MB", bytes as f64 / (1024.0 * 1024.0))
64    } else if bytes >= 1024 {
65        format!("{:.2} KB", bytes as f64 / 1024.0)
66    } else {
67        format!("{} B", bytes)
68    }
69}
70
71/// Compute per-segment doc ID offsets (each segment's docs start after the previous)
72fn doc_offsets(segments: &[SegmentReader]) -> Vec<u32> {
73    let mut offsets = Vec::with_capacity(segments.len());
74    let mut acc = 0u32;
75    for seg in segments {
76        offsets.push(acc);
77        acc += seg.num_docs();
78    }
79    offsets
80}
81
82/// Statistics for merge operations
83#[derive(Debug, Clone, Default)]
84pub struct MergeStats {
85    /// Number of terms processed
86    pub terms_processed: usize,
87    /// Peak memory usage in bytes (estimated)
88    pub peak_memory_bytes: usize,
89    /// Current memory usage in bytes (estimated)
90    pub current_memory_bytes: usize,
91    /// Term dictionary output size
92    pub term_dict_bytes: usize,
93    /// Postings output size
94    pub postings_bytes: usize,
95    /// Store output size
96    pub store_bytes: usize,
97    /// Vector index output size
98    pub vectors_bytes: usize,
99    /// Sparse vector index output size
100    pub sparse_bytes: usize,
101}
102
103/// Entry for k-way merge heap
104struct MergeEntry {
105    key: Vec<u8>,
106    term_info: TermInfo,
107    segment_idx: usize,
108    doc_offset: u32,
109}
110
111impl PartialEq for MergeEntry {
112    fn eq(&self, other: &Self) -> bool {
113        self.key == other.key
114    }
115}
116
117impl Eq for MergeEntry {}
118
119impl PartialOrd for MergeEntry {
120    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
121        Some(self.cmp(other))
122    }
123}
124
125impl Ord for MergeEntry {
126    fn cmp(&self, other: &Self) -> Ordering {
127        // Reverse order for min-heap (BinaryHeap is max-heap by default)
128        other.key.cmp(&self.key)
129    }
130}
131
132/// Trained vector index structures for rebuilding segments with ANN indexes
133pub struct TrainedVectorStructures {
134    /// Trained centroids per field_id
135    pub centroids: rustc_hash::FxHashMap<u32, Arc<crate::structures::CoarseCentroids>>,
136    /// Trained PQ codebooks per field_id (for ScaNN)
137    pub codebooks: rustc_hash::FxHashMap<u32, Arc<crate::structures::PQCodebook>>,
138}
139
140/// Segment merger - merges multiple segments into one
141pub struct SegmentMerger {
142    schema: Arc<Schema>,
143}
144
145impl SegmentMerger {
146    pub fn new(schema: Arc<Schema>) -> Self {
147        Self { schema }
148    }
149
150    /// Merge segments into one, streaming postings/positions/store directly to files.
151    pub async fn merge<D: Directory + DirectoryWriter>(
152        &self,
153        dir: &D,
154        segments: &[SegmentReader],
155        new_segment_id: SegmentId,
156    ) -> Result<(SegmentMeta, MergeStats)> {
157        self.merge_core(dir, segments, new_segment_id, None).await
158    }
159
160    /// Merge segments with trained ANN structures available.
161    ///
162    /// Dense vectors use O(1) cluster merge when possible (homogeneous IVF/ScaNN),
163    /// otherwise extracts raw vectors from all index types and rebuilds with
164    /// the provided trained structures.
165    pub async fn merge_with_ann<D: Directory + DirectoryWriter>(
166        &self,
167        dir: &D,
168        segments: &[SegmentReader],
169        new_segment_id: SegmentId,
170        trained: &TrainedVectorStructures,
171    ) -> Result<(SegmentMeta, MergeStats)> {
172        self.merge_core(dir, segments, new_segment_id, Some(trained))
173            .await
174    }
175
176    /// Core merge: handles all mandatory parts (postings, positions, store, sparse, field stats, meta)
177    /// and delegates dense vector handling based on available trained structures.
178    ///
179    /// Uses streaming writers so postings, positions, and store data flow directly
180    /// to files instead of buffering everything in memory. Only the term dictionary
181    /// (compact key+TermInfo entries) is buffered.
182    async fn merge_core<D: Directory + DirectoryWriter>(
183        &self,
184        dir: &D,
185        segments: &[SegmentReader],
186        new_segment_id: SegmentId,
187        trained: Option<&TrainedVectorStructures>,
188    ) -> Result<(SegmentMeta, MergeStats)> {
189        let mut stats = MergeStats::default();
190        let files = SegmentFiles::new(new_segment_id.0);
191
192        // === Phase 1: merge postings + positions (streaming) ===
193        let mut postings_writer = OffsetWriter::new(dir.streaming_writer(&files.postings).await?);
194        let mut positions_writer = OffsetWriter::new(dir.streaming_writer(&files.positions).await?);
195        let mut term_dict_writer = OffsetWriter::new(dir.streaming_writer(&files.term_dict).await?);
196
197        let terms_processed = self
198            .merge_postings(
199                segments,
200                &mut term_dict_writer,
201                &mut postings_writer,
202                &mut positions_writer,
203                &mut stats,
204            )
205            .await?;
206        stats.terms_processed = terms_processed;
207        stats.postings_bytes = postings_writer.offset() as usize;
208        stats.term_dict_bytes = term_dict_writer.offset() as usize;
209        let positions_bytes = positions_writer.offset();
210
211        postings_writer.finish()?;
212        term_dict_writer.finish()?;
213        if positions_bytes > 0 {
214            positions_writer.finish()?;
215        } else {
216            drop(positions_writer);
217            let _ = dir.delete(&files.positions).await;
218        }
219
220        // === Phase 2: merge store files (streaming) ===
221        {
222            let mut store_writer = OffsetWriter::new(dir.streaming_writer(&files.store).await?);
223            {
224                let mut store_merger = StoreMerger::new(&mut store_writer);
225                for segment in segments {
226                    if segment.store_has_dict() {
227                        store_merger
228                            .append_store_recompressing(segment.store())
229                            .await
230                            .map_err(crate::Error::Io)?;
231                    } else {
232                        let raw_blocks = segment.store_raw_blocks();
233                        let data_slice = segment.store_data_slice();
234                        store_merger.append_store(data_slice, &raw_blocks).await?;
235                    }
236                }
237                store_merger.finish()?;
238            }
239            stats.store_bytes = store_writer.offset() as usize;
240            store_writer.finish()?;
241        }
242
243        // === Dense vectors ===
244        let vectors_bytes = self
245            .merge_dense_vectors(dir, segments, &files, trained)
246            .await?;
247        stats.vectors_bytes = vectors_bytes;
248
249        // === Mandatory: merge sparse vectors ===
250        let sparse_bytes = self.merge_sparse_vectors(dir, segments, &files).await?;
251        stats.sparse_bytes = sparse_bytes;
252
253        // === Mandatory: merge field stats + write meta ===
254        let mut merged_field_stats: FxHashMap<u32, FieldStats> = FxHashMap::default();
255        for segment in segments {
256            for (&field_id, field_stats) in &segment.meta().field_stats {
257                let entry = merged_field_stats.entry(field_id).or_default();
258                entry.total_tokens += field_stats.total_tokens;
259                entry.doc_count += field_stats.doc_count;
260            }
261        }
262
263        let total_docs: u32 = segments.iter().map(|s| s.num_docs()).sum();
264        let meta = SegmentMeta {
265            id: new_segment_id.0,
266            num_docs: total_docs,
267            field_stats: merged_field_stats,
268        };
269
270        dir.write(&files.meta, &meta.serialize()?).await?;
271
272        let label = if trained.is_some() {
273            "ANN merge"
274        } else {
275            "Merge"
276        };
277        log::info!(
278            "{} complete: {} docs, {} terms, term_dict={}, postings={}, store={}, vectors={}, sparse={}",
279            label,
280            total_docs,
281            stats.terms_processed,
282            format_bytes(stats.term_dict_bytes),
283            format_bytes(stats.postings_bytes),
284            format_bytes(stats.store_bytes),
285            format_bytes(stats.vectors_bytes),
286            format_bytes(stats.sparse_bytes),
287        );
288
289        Ok((meta, stats))
290    }
291
292    /// Merge postings from multiple segments using streaming k-way merge
293    ///
294    /// This implementation uses a min-heap to merge terms from all segments
295    /// in sorted order without loading all terms into memory at once.
296    /// Memory usage is O(num_segments) instead of O(total_terms).
297    ///
298    /// Optimization: For terms that exist in only one segment, we copy the
299    /// posting data directly without decode/encode. Only terms that exist
300    /// in multiple segments need full merge.
301    ///
302    /// Returns the number of terms processed.
303    async fn merge_postings(
304        &self,
305        segments: &[SegmentReader],
306        term_dict: &mut OffsetWriter,
307        postings_out: &mut OffsetWriter,
308        positions_out: &mut OffsetWriter,
309        stats: &mut MergeStats,
310    ) -> Result<usize> {
311        let doc_offs = doc_offsets(segments);
312
313        // Bulk-prefetch all term dict blocks (1 I/O per segment instead of ~160)
314        for (i, segment) in segments.iter().enumerate() {
315            log::debug!("Prefetching term dict for segment {} ...", i);
316            segment.prefetch_term_dict().await?;
317        }
318
319        // Create iterators for each segment's term dictionary
320        let mut iterators: Vec<_> = segments.iter().map(|s| s.term_dict_iter()).collect();
321
322        // Initialize min-heap with first entry from each segment
323        let mut heap: BinaryHeap<MergeEntry> = BinaryHeap::new();
324        for (seg_idx, iter) in iterators.iter_mut().enumerate() {
325            if let Some((key, term_info)) = iter.next().await.map_err(crate::Error::from)? {
326                heap.push(MergeEntry {
327                    key,
328                    term_info,
329                    segment_idx: seg_idx,
330                    doc_offset: doc_offs[seg_idx],
331                });
332            }
333        }
334
335        // Buffer term results - needed because SSTableWriter can't be held across await points
336        // Memory is bounded by unique terms (typically much smaller than postings)
337        let mut term_results: Vec<(Vec<u8>, TermInfo)> = Vec::new();
338        let mut terms_processed = 0usize;
339
340        while !heap.is_empty() {
341            // Get the smallest key
342            let first = heap.pop().unwrap();
343            let current_key = first.key.clone();
344
345            // Collect all entries with the same key
346            let mut sources: Vec<(usize, TermInfo, u32)> =
347                vec![(first.segment_idx, first.term_info, first.doc_offset)];
348
349            // Advance the iterator that provided this entry
350            if let Some((key, term_info)) = iterators[first.segment_idx]
351                .next()
352                .await
353                .map_err(crate::Error::from)?
354            {
355                heap.push(MergeEntry {
356                    key,
357                    term_info,
358                    segment_idx: first.segment_idx,
359                    doc_offset: doc_offs[first.segment_idx],
360                });
361            }
362
363            // Check if other segments have the same key
364            while let Some(entry) = heap.peek() {
365                if entry.key != current_key {
366                    break;
367                }
368                let entry = heap.pop().unwrap();
369                sources.push((entry.segment_idx, entry.term_info, entry.doc_offset));
370
371                // Advance this iterator too
372                if let Some((key, term_info)) = iterators[entry.segment_idx]
373                    .next()
374                    .await
375                    .map_err(crate::Error::from)?
376                {
377                    heap.push(MergeEntry {
378                        key,
379                        term_info,
380                        segment_idx: entry.segment_idx,
381                        doc_offset: doc_offs[entry.segment_idx],
382                    });
383                }
384            }
385
386            // Process this term (handles both single-source and multi-source)
387            let term_info = self
388                .merge_term(segments, &sources, postings_out, positions_out)
389                .await?;
390
391            term_results.push((current_key, term_info));
392            terms_processed += 1;
393
394            // Log progress every 100k terms
395            if terms_processed.is_multiple_of(100_000) {
396                log::debug!("Merge progress: {} terms processed", terms_processed);
397            }
398        }
399
400        // Track memory (only term_results is buffered; postings/positions stream to disk)
401        let results_mem = term_results.capacity() * std::mem::size_of::<(Vec<u8>, TermInfo)>();
402        stats.current_memory_bytes = results_mem;
403        stats.peak_memory_bytes = stats.peak_memory_bytes.max(stats.current_memory_bytes);
404
405        log::info!(
406            "[merge] complete: terms={}, segments={}, term_buffer={:.2} MB, postings={}, positions={}",
407            terms_processed,
408            segments.len(),
409            results_mem as f64 / (1024.0 * 1024.0),
410            format_bytes(postings_out.offset() as usize),
411            format_bytes(positions_out.offset() as usize),
412        );
413
414        // Write to SSTable (sync, no await points)
415        let mut writer = SSTableWriter::<TermInfo>::new(term_dict);
416        for (key, term_info) in term_results {
417            writer.insert(&key, &term_info)?;
418        }
419        writer.finish()?;
420
421        Ok(terms_processed)
422    }
423
424    /// Merge a single term's postings + positions from one or more source segments.
425    ///
426    /// Fast path: when all sources are external and there are multiple,
427    /// uses block-level concatenation (O(blocks) instead of O(postings)).
428    /// Otherwise: full decode → remap doc IDs → re-encode.
429    async fn merge_term(
430        &self,
431        segments: &[SegmentReader],
432        sources: &[(usize, TermInfo, u32)],
433        postings_out: &mut OffsetWriter,
434        positions_out: &mut OffsetWriter,
435    ) -> Result<TermInfo> {
436        let mut sorted: Vec<_> = sources.to_vec();
437        sorted.sort_by_key(|(_, _, off)| *off);
438
439        let any_positions = sorted.iter().any(|(_, ti, _)| ti.position_info().is_some());
440        let all_external = sorted.iter().all(|(_, ti, _)| ti.external_info().is_some());
441
442        // === Merge postings ===
443        let (posting_offset, posting_len, doc_count) = if all_external && sorted.len() > 1 {
444            // Fast path: block-level concatenation
445            let mut block_sources = Vec::with_capacity(sorted.len());
446            for (seg_idx, ti, doc_off) in &sorted {
447                let (off, len) = ti.external_info().unwrap();
448                let bytes = segments[*seg_idx].read_postings(off, len).await?;
449                let bpl = BlockPostingList::deserialize(&mut bytes.as_slice())?;
450                block_sources.push((bpl, *doc_off));
451            }
452            let merged = BlockPostingList::concatenate_blocks(&block_sources)?;
453            let offset = postings_out.offset();
454            let mut buf = Vec::new();
455            merged.serialize(&mut buf)?;
456            postings_out.write_all(&buf)?;
457            (offset, buf.len() as u32, merged.doc_count())
458        } else {
459            // Decode all sources into a flat PostingList, remap doc IDs
460            let mut merged = PostingList::new();
461            for (seg_idx, ti, doc_off) in &sorted {
462                if let Some((ids, tfs)) = ti.decode_inline() {
463                    for (id, tf) in ids.into_iter().zip(tfs) {
464                        merged.add(id + doc_off, tf);
465                    }
466                } else {
467                    let (off, len) = ti.external_info().unwrap();
468                    let bytes = segments[*seg_idx].read_postings(off, len).await?;
469                    let bpl = BlockPostingList::deserialize(&mut bytes.as_slice())?;
470                    let mut it = bpl.iterator();
471                    while it.doc() != TERMINATED {
472                        merged.add(it.doc() + doc_off, it.term_freq());
473                        it.advance();
474                    }
475                }
476            }
477            // Try to inline (only when no positions)
478            if !any_positions {
479                let ids: Vec<u32> = merged.iter().map(|p| p.doc_id).collect();
480                let tfs: Vec<u32> = merged.iter().map(|p| p.term_freq).collect();
481                if let Some(inline) = TermInfo::try_inline(&ids, &tfs) {
482                    return Ok(inline);
483                }
484            }
485            let offset = postings_out.offset();
486            let block = BlockPostingList::from_posting_list(&merged)?;
487            let mut buf = Vec::new();
488            block.serialize(&mut buf)?;
489            postings_out.write_all(&buf)?;
490            (offset, buf.len() as u32, merged.doc_count())
491        };
492
493        // === Merge positions (if any source has them) ===
494        if any_positions {
495            let mut pos_sources = Vec::new();
496            for (seg_idx, ti, doc_off) in &sorted {
497                if let Some((pos_off, pos_len)) = ti.position_info()
498                    && let Some(bytes) = segments[*seg_idx]
499                        .read_position_bytes(pos_off, pos_len)
500                        .await?
501                {
502                    let pl = PositionPostingList::deserialize(&mut bytes.as_slice())
503                        .map_err(crate::Error::Io)?;
504                    pos_sources.push((pl, *doc_off));
505                }
506            }
507            if !pos_sources.is_empty() {
508                let merged = PositionPostingList::concatenate_blocks(&pos_sources)
509                    .map_err(crate::Error::Io)?;
510                let offset = positions_out.offset();
511                let mut buf = Vec::new();
512                merged.serialize(&mut buf).map_err(crate::Error::Io)?;
513                positions_out.write_all(&buf)?;
514                return Ok(TermInfo::external_with_positions(
515                    posting_offset,
516                    posting_len,
517                    doc_count,
518                    offset,
519                    buf.len() as u32,
520                ));
521            }
522        }
523
524        Ok(TermInfo::external(posting_offset, posting_len, doc_count))
525    }
526
527    /// Merge dense vector indexes - returns output size in bytes
528    ///
529    /// Strategy:
530    /// 1. O(1) cluster merge when all segments have the same ANN type (IVF/ScaNN)
531    /// 2. Fallback: extract raw vectors from ALL index types (Flat, RaBitQ, IVF clusters)
532    ///    - If trained structures available → rebuild as IVF/ScaNN
533    ///    - Otherwise → keep as Flat
534    async fn merge_dense_vectors<D: Directory + DirectoryWriter>(
535        &self,
536        dir: &D,
537        segments: &[SegmentReader],
538        files: &SegmentFiles,
539        trained: Option<&TrainedVectorStructures>,
540    ) -> Result<usize> {
541        use crate::dsl::VectorIndexType;
542
543        let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
544
545        for (field, entry) in self.schema.fields() {
546            if !matches!(entry.field_type, FieldType::DenseVector) || !entry.indexed {
547                continue;
548            }
549
550            // --- Fast path: O(1) cluster merge for homogeneous ScaNN ---
551            let scann_indexes: Vec<_> = segments
552                .iter()
553                .filter_map(|s| s.get_scann_vector_index(field))
554                .collect();
555
556            let segments_with_vectors = segments
557                .iter()
558                .filter(|s| s.has_dense_vector_index(field))
559                .count();
560
561            if scann_indexes.len() == segments_with_vectors && !scann_indexes.is_empty() {
562                let refs: Vec<&crate::structures::IVFPQIndex> =
563                    scann_indexes.iter().map(|(idx, _)| idx.as_ref()).collect();
564                let doc_offs = doc_offsets(segments);
565
566                match crate::structures::IVFPQIndex::merge(&refs, &doc_offs) {
567                    Ok(merged) => {
568                        let bytes = merged
569                            .to_bytes()
570                            .map_err(|e| crate::Error::Serialization(e.to_string()))?;
571                        field_indexes.push((field.0, 2u8, bytes));
572                        continue;
573                    }
574                    Err(e) => {
575                        log::warn!("ScaNN merge failed: {}, falling back to rebuild", e);
576                    }
577                }
578            }
579
580            // --- Fast path: O(1) cluster merge for homogeneous IVF-RaBitQ ---
581            let ivf_indexes: Vec<_> = segments
582                .iter()
583                .filter_map(|s| s.get_ivf_vector_index(field))
584                .collect();
585
586            if ivf_indexes.len() == segments_with_vectors && !ivf_indexes.is_empty() {
587                let refs: Vec<&crate::structures::IVFRaBitQIndex> =
588                    ivf_indexes.iter().map(|arc| arc.as_ref()).collect();
589                let doc_offs = doc_offsets(segments);
590
591                match crate::structures::IVFRaBitQIndex::merge(&refs, &doc_offs) {
592                    Ok(merged) => {
593                        let bytes = merged
594                            .to_bytes()
595                            .map_err(|e| crate::Error::Serialization(e.to_string()))?;
596                        field_indexes.push((field.0, 1u8, bytes));
597                        continue;
598                    }
599                    Err(e) => {
600                        log::warn!("IVF merge failed: {}, falling back to rebuild", e);
601                    }
602                }
603            }
604
605            // --- Fallback: collect raw vectors from ALL index types ---
606            let mut all_vectors: Vec<Vec<f32>> = Vec::new();
607            let mut all_doc_ids: Vec<(u32, u16)> = Vec::new();
608            let doc_offs = doc_offsets(segments);
609
610            for (seg_idx, segment) in segments.iter().enumerate() {
611                let offset = doc_offs[seg_idx];
612                match segment.vector_indexes().get(&field.0) {
613                    Some(super::VectorIndex::Flat(flat_data)) => {
614                        for (vec, &(local_doc_id, ordinal)) in
615                            flat_data.vectors.iter().zip(flat_data.doc_ids.iter())
616                        {
617                            all_vectors.push(vec.clone());
618                            all_doc_ids.push((offset + local_doc_id, ordinal));
619                        }
620                    }
621                    Some(super::VectorIndex::RaBitQ(index)) => {
622                        if let Some(raw_vecs) = &index.raw_vectors {
623                            for (i, vec) in raw_vecs.iter().enumerate() {
624                                all_vectors.push(vec.clone());
625                                all_doc_ids.push((offset + i as u32, 0));
626                            }
627                        }
628                    }
629                    Some(super::VectorIndex::IVF { index, .. }) => {
630                        // Extract raw vectors from IVF-RaBitQ clusters
631                        for cluster in index.clusters.clusters.values() {
632                            if let Some(ref raw_vecs) = cluster.raw_vectors {
633                                for (i, raw) in raw_vecs.iter().enumerate() {
634                                    all_vectors.push(raw.clone());
635                                    all_doc_ids
636                                        .push((offset + cluster.doc_ids[i], cluster.ordinals[i]));
637                                }
638                            }
639                        }
640                    }
641                    // ScaNN (IVF-PQ) stores only PQ codes, no raw vectors — skip
642                    _ => {}
643                }
644            }
645
646            if all_vectors.is_empty() {
647                continue;
648            }
649
650            let config = entry.dense_vector_config.as_ref();
651            let dim = config
652                .map(|c| c.index_dim())
653                .unwrap_or(all_vectors[0].len());
654
655            // Try to rebuild as ANN if trained structures are available
656            let mut built_ann = false;
657            if let (Some(trained), Some(config)) = (trained, config) {
658                match config.index_type {
659                    VectorIndexType::IvfRaBitQ => {
660                        if let Some(centroids) = trained.centroids.get(&field.0) {
661                            let rabitq_config = crate::structures::RaBitQConfig::new(dim);
662                            let codebook = crate::structures::RaBitQCodebook::new(rabitq_config);
663                            let ivf_config = crate::structures::IVFRaBitQConfig::new(dim)
664                                .with_store_raw(config.store_raw);
665                            let ivf_index = crate::structures::IVFRaBitQIndex::build(
666                                ivf_config,
667                                centroids,
668                                &codebook,
669                                &all_vectors,
670                                Some(&all_doc_ids),
671                            );
672                            let index_data = super::builder::IVFRaBitQIndexData {
673                                centroids: (**centroids).clone(),
674                                codebook,
675                                index: ivf_index,
676                            };
677                            let bytes = index_data
678                                .to_bytes()
679                                .map_err(|e| crate::Error::Serialization(e.to_string()))?;
680                            field_indexes.push((field.0, 1u8, bytes));
681                            built_ann = true;
682                            log::info!(
683                                "Rebuilt IVF-RaBitQ for field {} ({} vectors)",
684                                field.0,
685                                all_vectors.len()
686                            );
687                        }
688                    }
689                    VectorIndexType::ScaNN => {
690                        if let (Some(centroids), Some(codebook)) = (
691                            trained.centroids.get(&field.0),
692                            trained.codebooks.get(&field.0),
693                        ) {
694                            let ivf_pq_config = crate::structures::IVFPQConfig::new(dim);
695                            let ivf_pq_index = crate::structures::IVFPQIndex::build(
696                                ivf_pq_config,
697                                centroids,
698                                codebook,
699                                &all_vectors,
700                                Some(&all_doc_ids),
701                            );
702                            let index_data = super::builder::ScaNNIndexData {
703                                centroids: (**centroids).clone(),
704                                codebook: (**codebook).clone(),
705                                index: ivf_pq_index,
706                            };
707                            let bytes = index_data
708                                .to_bytes()
709                                .map_err(|e| crate::Error::Serialization(e.to_string()))?;
710                            field_indexes.push((field.0, 2u8, bytes));
711                            built_ann = true;
712                            log::info!(
713                                "Rebuilt ScaNN for field {} ({} vectors)",
714                                field.0,
715                                all_vectors.len()
716                            );
717                        }
718                    }
719                    _ => {}
720                }
721            }
722
723            // Fall back to Flat if no ANN rebuild happened
724            if !built_ann {
725                let flat_data = super::vector_data::FlatVectorData {
726                    dim,
727                    vectors: all_vectors,
728                    doc_ids: all_doc_ids,
729                };
730                let bytes = flat_data.to_binary_bytes();
731                field_indexes.push((field.0, 4u8, bytes));
732            }
733        }
734
735        write_vector_file(dir, files, field_indexes).await
736    }
737
738    /// Merge sparse vector indexes using block stacking
739    ///
740    /// This is O(blocks) instead of O(postings) - we stack blocks directly
741    /// and only adjust the first_doc_id in each block header by the doc offset.
742    /// Deltas within blocks remain unchanged since they're relative.
743    async fn merge_sparse_vectors<D: Directory + DirectoryWriter>(
744        &self,
745        dir: &D,
746        segments: &[SegmentReader],
747        files: &SegmentFiles,
748    ) -> Result<usize> {
749        use crate::structures::BlockSparsePostingList;
750        use byteorder::{LittleEndian, WriteBytesExt};
751
752        let doc_offs = doc_offsets(segments);
753        for (i, seg) in segments.iter().enumerate() {
754            log::debug!(
755                "Sparse merge: segment {} has {} docs, doc_offset={}",
756                i,
757                seg.num_docs(),
758                doc_offs[i]
759            );
760        }
761
762        // Collect all sparse vector fields from schema
763        let sparse_fields: Vec<_> = self
764            .schema
765            .fields()
766            .filter(|(_, entry)| matches!(entry.field_type, FieldType::SparseVector))
767            .map(|(field, entry)| (field, entry.sparse_vector_config.clone()))
768            .collect();
769
770        if sparse_fields.is_empty() {
771            return Ok(0);
772        }
773
774        // Collect field data: (field_id, quantization, max_dim_id, dim_id -> merged_posting_list)
775        type SparseFieldData = (
776            u32,
777            crate::structures::WeightQuantization,
778            u32,
779            FxHashMap<u32, Vec<u8>>,
780        );
781        let mut field_data: Vec<SparseFieldData> = Vec::new();
782
783        for (field, sparse_config) in &sparse_fields {
784            // Get quantization from config
785            let quantization = sparse_config
786                .as_ref()
787                .map(|c| c.weight_quantization)
788                .unwrap_or(crate::structures::WeightQuantization::Float32);
789
790            // Collect all dimensions across all segments for this field
791            let mut all_dims: rustc_hash::FxHashSet<u32> = rustc_hash::FxHashSet::default();
792            for segment in segments {
793                if let Some(sparse_index) = segment.sparse_indexes().get(&field.0) {
794                    for dim_id in sparse_index.active_dimensions() {
795                        all_dims.insert(dim_id);
796                    }
797                }
798            }
799
800            if all_dims.is_empty() {
801                continue;
802            }
803
804            // Bulk-read ALL posting lists per segment in one I/O call each.
805            // This replaces 80K+ individual get_posting() calls with ~4 bulk reads.
806            let mut segment_postings: Vec<FxHashMap<u32, Arc<BlockSparsePostingList>>> =
807                Vec::with_capacity(segments.len());
808            for (seg_idx, segment) in segments.iter().enumerate() {
809                if let Some(sparse_index) = segment.sparse_indexes().get(&field.0) {
810                    log::debug!(
811                        "Sparse merge field {}: bulk-reading {} dims from segment {}",
812                        field.0,
813                        sparse_index.num_dimensions(),
814                        seg_idx
815                    );
816                    let postings = sparse_index.read_all_postings_bulk().await?;
817                    segment_postings.push(postings);
818                } else {
819                    segment_postings.push(FxHashMap::default());
820                }
821            }
822
823            let mut dim_bytes: FxHashMap<u32, Vec<u8>> = FxHashMap::default();
824
825            // Merge from in-memory data — no I/O in this loop
826            for dim_id in all_dims {
827                let mut posting_arcs: Vec<(Arc<BlockSparsePostingList>, u32)> = Vec::new();
828
829                for (seg_idx, postings) in segment_postings.iter().enumerate() {
830                    if let Some(posting_list) = postings.get(&dim_id) {
831                        posting_arcs.push((Arc::clone(posting_list), doc_offs[seg_idx]));
832                    }
833                }
834
835                if posting_arcs.is_empty() {
836                    continue;
837                }
838
839                let lists_with_offsets: Vec<(&BlockSparsePostingList, u32)> = posting_arcs
840                    .iter()
841                    .map(|(pl, offset)| (pl.as_ref(), *offset))
842                    .collect();
843
844                let merged = BlockSparsePostingList::merge_with_offsets(&lists_with_offsets);
845
846                let mut bytes = Vec::new();
847                merged.serialize(&mut bytes).map_err(crate::Error::Io)?;
848                dim_bytes.insert(dim_id, bytes);
849            }
850
851            // Drop bulk data before accumulating output
852            drop(segment_postings);
853
854            // Store num_dims (active count) instead of max_dim_id
855            field_data.push((field.0, quantization, dim_bytes.len() as u32, dim_bytes));
856        }
857
858        if field_data.is_empty() {
859            return Ok(0);
860        }
861
862        // Sort by field_id
863        field_data.sort_by_key(|(id, _, _, _)| *id);
864
865        // Compute header size and per-dimension offsets before writing
866        // num_fields(u32) + per-field: field_id(u32) + quant(u8) + num_dims(u32)
867        // per-dim: dim_id(u32) + offset(u64) + length(u32)
868        let per_dim_entry = size_of::<u32>() + size_of::<u64>() + size_of::<u32>();
869        let per_field_header = size_of::<u32>() + size_of::<u8>() + size_of::<u32>();
870        let mut header_size = size_of::<u32>() as u64;
871        for (_, _, num_dims, _) in &field_data {
872            header_size += per_field_header as u64;
873            header_size += (*num_dims as u64) * per_dim_entry as u64;
874        }
875
876        // Pre-compute offset tables (small — just dim_id + offset + length per dim)
877        let mut current_offset = header_size;
878        let mut field_tables: Vec<Vec<(u32, u64, u32)>> = Vec::new();
879        for (_, _, _, dim_bytes) in &field_data {
880            let mut table: Vec<(u32, u64, u32)> = Vec::with_capacity(dim_bytes.len());
881            let mut dims: Vec<_> = dim_bytes.keys().copied().collect();
882            dims.sort();
883            for dim_id in dims {
884                let bytes = &dim_bytes[&dim_id];
885                table.push((dim_id, current_offset, bytes.len() as u32));
886                current_offset += bytes.len() as u64;
887            }
888            field_tables.push(table);
889        }
890
891        // Stream header + tables + data directly to disk
892        let mut writer = OffsetWriter::new(dir.streaming_writer(&files.sparse).await?);
893
894        writer.write_u32::<LittleEndian>(field_data.len() as u32)?;
895        for (i, (field_id, quantization, num_dims, _)) in field_data.iter().enumerate() {
896            writer.write_u32::<LittleEndian>(*field_id)?;
897            writer.write_u8(*quantization as u8)?;
898            writer.write_u32::<LittleEndian>(*num_dims)?;
899            for &(dim_id, offset, length) in &field_tables[i] {
900                writer.write_u32::<LittleEndian>(dim_id)?;
901                writer.write_u64::<LittleEndian>(offset)?;
902                writer.write_u32::<LittleEndian>(length)?;
903            }
904        }
905
906        // Stream posting data per-field, per-dimension (drop each after writing)
907        for (_, _, _, dim_bytes) in field_data {
908            let mut dims: Vec<_> = dim_bytes.keys().copied().collect();
909            dims.sort();
910            for dim_id in dims {
911                writer.write_all(&dim_bytes[&dim_id])?;
912            }
913        }
914
915        let output_size = writer.offset() as usize;
916        writer.finish()?;
917
918        log::info!(
919            "Sparse vector merge complete: {} fields, {} bytes",
920            field_tables.len(),
921            output_size
922        );
923
924        Ok(output_size)
925    }
926}
927
928/// Write a vector index file with per-field header + data.
929/// Streams header then each field's data directly to disk, avoiding a single
930/// giant concatenation buffer.
931async fn write_vector_file<D: Directory + DirectoryWriter>(
932    dir: &D,
933    files: &SegmentFiles,
934    mut field_indexes: Vec<(u32, u8, Vec<u8>)>,
935) -> Result<usize> {
936    use byteorder::{LittleEndian, WriteBytesExt};
937
938    if field_indexes.is_empty() {
939        return Ok(0);
940    }
941
942    field_indexes.sort_by_key(|(id, _, _)| *id);
943
944    let mut writer = OffsetWriter::new(dir.streaming_writer(&files.vectors).await?);
945
946    // num_fields(u32) + per-field: field_id(u32) + index_type(u8) + offset(u64) + length(u64)
947    let per_field_entry = size_of::<u32>() + size_of::<u8>() + size_of::<u64>() + size_of::<u64>();
948    let header_size = size_of::<u32>() + field_indexes.len() * per_field_entry;
949    writer.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
950
951    let mut current_offset = header_size as u64;
952    for (field_id, index_type, data) in &field_indexes {
953        writer.write_u32::<LittleEndian>(*field_id)?;
954        writer.write_u8(*index_type)?;
955        writer.write_u64::<LittleEndian>(current_offset)?;
956        writer.write_u64::<LittleEndian>(data.len() as u64)?;
957        current_offset += data.len() as u64;
958    }
959
960    // Stream each field's data (drop after writing)
961    for (_, _, data) in field_indexes {
962        writer.write_all(&data)?;
963    }
964
965    let output_size = writer.offset() as usize;
966    writer.finish()?;
967    Ok(output_size)
968}
969
970/// Delete segment files from directory
971pub async fn delete_segment<D: Directory + DirectoryWriter>(
972    dir: &D,
973    segment_id: SegmentId,
974) -> Result<()> {
975    let files = SegmentFiles::new(segment_id.0);
976    let _ = dir.delete(&files.term_dict).await;
977    let _ = dir.delete(&files.postings).await;
978    let _ = dir.delete(&files.store).await;
979    let _ = dir.delete(&files.meta).await;
980    let _ = dir.delete(&files.vectors).await;
981    Ok(())
982}