Skip to main content

hermes_core/segment/
merger.rs

1//! Segment merger for combining multiple segments
2
3use std::cmp::Ordering;
4use std::collections::BinaryHeap;
5use std::io::Write;
6use std::mem::size_of;
7use std::sync::Arc;
8
9use rustc_hash::FxHashMap;
10
11use super::reader::SegmentReader;
12use super::store::StoreMerger;
13use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
14use crate::Result;
15use crate::directories::{Directory, DirectoryWriter, StreamingWriter};
16use crate::dsl::{FieldType, Schema};
17use crate::structures::{
18    BlockPostingList, PositionPostingList, PostingList, SSTableWriter, TERMINATED, TermInfo,
19};
20
21/// Write adapter that tracks bytes written.
22///
23/// Concrete type so it works with generic `serialize<W: Write>` functions
24/// (unlike `dyn StreamingWriter` which isn't `Sized`).
25pub(crate) struct OffsetWriter {
26    inner: Box<dyn StreamingWriter>,
27    offset: u64,
28}
29
30impl OffsetWriter {
31    fn new(inner: Box<dyn StreamingWriter>) -> Self {
32        Self { inner, offset: 0 }
33    }
34
35    /// Current write position (total bytes written so far).
36    fn offset(&self) -> u64 {
37        self.offset
38    }
39
40    /// Finalize the underlying streaming writer.
41    fn finish(self) -> std::io::Result<()> {
42        self.inner.finish()
43    }
44}
45
46impl Write for OffsetWriter {
47    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
48        let n = self.inner.write(buf)?;
49        self.offset += n as u64;
50        Ok(n)
51    }
52
53    fn flush(&mut self) -> std::io::Result<()> {
54        self.inner.flush()
55    }
56}
57
58/// Format byte count as human-readable string
59fn format_bytes(bytes: usize) -> String {
60    if bytes >= 1024 * 1024 * 1024 {
61        format!("{:.2} GB", bytes as f64 / (1024.0 * 1024.0 * 1024.0))
62    } else if bytes >= 1024 * 1024 {
63        format!("{:.2} MB", bytes as f64 / (1024.0 * 1024.0))
64    } else if bytes >= 1024 {
65        format!("{:.2} KB", bytes as f64 / 1024.0)
66    } else {
67        format!("{} B", bytes)
68    }
69}
70
71/// Compute per-segment doc ID offsets (each segment's docs start after the previous)
72fn doc_offsets(segments: &[SegmentReader]) -> Vec<u32> {
73    let mut offsets = Vec::with_capacity(segments.len());
74    let mut acc = 0u32;
75    for seg in segments {
76        offsets.push(acc);
77        acc += seg.num_docs();
78    }
79    offsets
80}
81
82/// Statistics for merge operations
83#[derive(Debug, Clone, Default)]
84pub struct MergeStats {
85    /// Number of terms processed
86    pub terms_processed: usize,
87    /// Peak memory usage in bytes (estimated)
88    pub peak_memory_bytes: usize,
89    /// Current memory usage in bytes (estimated)
90    pub current_memory_bytes: usize,
91    /// Term dictionary output size
92    pub term_dict_bytes: usize,
93    /// Postings output size
94    pub postings_bytes: usize,
95    /// Store output size
96    pub store_bytes: usize,
97    /// Vector index output size
98    pub vectors_bytes: usize,
99    /// Sparse vector index output size
100    pub sparse_bytes: usize,
101}
102
103/// Entry for k-way merge heap
104struct MergeEntry {
105    key: Vec<u8>,
106    term_info: TermInfo,
107    segment_idx: usize,
108    doc_offset: u32,
109}
110
111impl PartialEq for MergeEntry {
112    fn eq(&self, other: &Self) -> bool {
113        self.key == other.key
114    }
115}
116
117impl Eq for MergeEntry {}
118
119impl PartialOrd for MergeEntry {
120    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
121        Some(self.cmp(other))
122    }
123}
124
125impl Ord for MergeEntry {
126    fn cmp(&self, other: &Self) -> Ordering {
127        // Reverse order for min-heap (BinaryHeap is max-heap by default)
128        other.key.cmp(&self.key)
129    }
130}
131
132/// Trained vector index structures for rebuilding segments with ANN indexes
133pub struct TrainedVectorStructures {
134    /// Trained centroids per field_id
135    pub centroids: rustc_hash::FxHashMap<u32, Arc<crate::structures::CoarseCentroids>>,
136    /// Trained PQ codebooks per field_id (for ScaNN)
137    pub codebooks: rustc_hash::FxHashMap<u32, Arc<crate::structures::PQCodebook>>,
138}
139
140/// Segment merger - merges multiple segments into one
141pub struct SegmentMerger {
142    schema: Arc<Schema>,
143}
144
145impl SegmentMerger {
146    pub fn new(schema: Arc<Schema>) -> Self {
147        Self { schema }
148    }
149
150    /// Merge segments into one, streaming postings/positions/store directly to files.
151    pub async fn merge<D: Directory + DirectoryWriter>(
152        &self,
153        dir: &D,
154        segments: &[SegmentReader],
155        new_segment_id: SegmentId,
156    ) -> Result<(SegmentMeta, MergeStats)> {
157        self.merge_core(dir, segments, new_segment_id, None).await
158    }
159
160    /// Merge segments with trained ANN structures available.
161    ///
162    /// Dense vectors use O(1) cluster merge when possible (homogeneous IVF/ScaNN),
163    /// otherwise extracts raw vectors from all index types and rebuilds with
164    /// the provided trained structures.
165    pub async fn merge_with_ann<D: Directory + DirectoryWriter>(
166        &self,
167        dir: &D,
168        segments: &[SegmentReader],
169        new_segment_id: SegmentId,
170        trained: &TrainedVectorStructures,
171    ) -> Result<(SegmentMeta, MergeStats)> {
172        self.merge_core(dir, segments, new_segment_id, Some(trained))
173            .await
174    }
175
176    /// Core merge: handles all mandatory parts (postings, positions, store, sparse, field stats, meta)
177    /// and delegates dense vector handling based on available trained structures.
178    ///
179    /// Uses streaming writers so postings, positions, and store data flow directly
180    /// to files instead of buffering everything in memory. Only the term dictionary
181    /// (compact key+TermInfo entries) is buffered.
182    async fn merge_core<D: Directory + DirectoryWriter>(
183        &self,
184        dir: &D,
185        segments: &[SegmentReader],
186        new_segment_id: SegmentId,
187        trained: Option<&TrainedVectorStructures>,
188    ) -> Result<(SegmentMeta, MergeStats)> {
189        let mut stats = MergeStats::default();
190        let files = SegmentFiles::new(new_segment_id.0);
191
192        // === Phase 1: merge postings + positions (streaming) ===
193        let mut postings_writer = OffsetWriter::new(dir.streaming_writer(&files.postings).await?);
194        let mut positions_writer = OffsetWriter::new(dir.streaming_writer(&files.positions).await?);
195        let mut term_dict_writer = OffsetWriter::new(dir.streaming_writer(&files.term_dict).await?);
196
197        let terms_processed = self
198            .merge_postings(
199                segments,
200                &mut term_dict_writer,
201                &mut postings_writer,
202                &mut positions_writer,
203                &mut stats,
204            )
205            .await?;
206        stats.terms_processed = terms_processed;
207        stats.postings_bytes = postings_writer.offset() as usize;
208        stats.term_dict_bytes = term_dict_writer.offset() as usize;
209        let positions_bytes = positions_writer.offset();
210
211        postings_writer.finish()?;
212        term_dict_writer.finish()?;
213        if positions_bytes > 0 {
214            positions_writer.finish()?;
215        } else {
216            drop(positions_writer);
217            let _ = dir.delete(&files.positions).await;
218        }
219
220        // === Phase 2: merge store files (streaming) ===
221        {
222            let mut store_writer = OffsetWriter::new(dir.streaming_writer(&files.store).await?);
223            {
224                let mut store_merger = StoreMerger::new(&mut store_writer);
225                for segment in segments {
226                    if segment.store_has_dict() {
227                        store_merger
228                            .append_store_recompressing(segment.store())
229                            .await
230                            .map_err(crate::Error::Io)?;
231                    } else {
232                        let raw_blocks = segment.store_raw_blocks();
233                        let data_slice = segment.store_data_slice();
234                        store_merger.append_store(data_slice, &raw_blocks).await?;
235                    }
236                }
237                store_merger.finish()?;
238            }
239            stats.store_bytes = store_writer.offset() as usize;
240            store_writer.finish()?;
241        }
242
243        // === Dense vectors ===
244        let vectors_bytes = self
245            .merge_dense_vectors(dir, segments, &files, trained)
246            .await?;
247        stats.vectors_bytes = vectors_bytes;
248
249        // === Mandatory: merge sparse vectors ===
250        let sparse_bytes = self.merge_sparse_vectors(dir, segments, &files).await?;
251        stats.sparse_bytes = sparse_bytes;
252
253        // === Mandatory: merge field stats + write meta ===
254        let mut merged_field_stats: FxHashMap<u32, FieldStats> = FxHashMap::default();
255        for segment in segments {
256            for (&field_id, field_stats) in &segment.meta().field_stats {
257                let entry = merged_field_stats.entry(field_id).or_default();
258                entry.total_tokens += field_stats.total_tokens;
259                entry.doc_count += field_stats.doc_count;
260            }
261        }
262
263        let total_docs: u32 = segments.iter().map(|s| s.num_docs()).sum();
264        let meta = SegmentMeta {
265            id: new_segment_id.0,
266            num_docs: total_docs,
267            field_stats: merged_field_stats,
268        };
269
270        dir.write(&files.meta, &meta.serialize()?).await?;
271
272        let label = if trained.is_some() {
273            "ANN merge"
274        } else {
275            "Merge"
276        };
277        log::info!(
278            "{} complete: {} docs, {} terms, term_dict={}, postings={}, store={}, vectors={}, sparse={}",
279            label,
280            total_docs,
281            stats.terms_processed,
282            format_bytes(stats.term_dict_bytes),
283            format_bytes(stats.postings_bytes),
284            format_bytes(stats.store_bytes),
285            format_bytes(stats.vectors_bytes),
286            format_bytes(stats.sparse_bytes),
287        );
288
289        Ok((meta, stats))
290    }
291
292    /// Merge postings from multiple segments using streaming k-way merge
293    ///
294    /// This implementation uses a min-heap to merge terms from all segments
295    /// in sorted order without loading all terms into memory at once.
296    /// Memory usage is O(num_segments) instead of O(total_terms).
297    ///
298    /// Optimization: For terms that exist in only one segment, we copy the
299    /// posting data directly without decode/encode. Only terms that exist
300    /// in multiple segments need full merge.
301    ///
302    /// Returns the number of terms processed.
303    async fn merge_postings(
304        &self,
305        segments: &[SegmentReader],
306        term_dict: &mut OffsetWriter,
307        postings_out: &mut OffsetWriter,
308        positions_out: &mut OffsetWriter,
309        stats: &mut MergeStats,
310    ) -> Result<usize> {
311        let doc_offs = doc_offsets(segments);
312
313        // Bulk-prefetch all term dict blocks (1 I/O per segment instead of ~160)
314        for (i, segment) in segments.iter().enumerate() {
315            log::debug!("Prefetching term dict for segment {} ...", i);
316            segment.prefetch_term_dict().await?;
317        }
318
319        // Create iterators for each segment's term dictionary
320        let mut iterators: Vec<_> = segments.iter().map(|s| s.term_dict_iter()).collect();
321
322        // Initialize min-heap with first entry from each segment
323        let mut heap: BinaryHeap<MergeEntry> = BinaryHeap::new();
324        for (seg_idx, iter) in iterators.iter_mut().enumerate() {
325            if let Some((key, term_info)) = iter.next().await.map_err(crate::Error::from)? {
326                heap.push(MergeEntry {
327                    key,
328                    term_info,
329                    segment_idx: seg_idx,
330                    doc_offset: doc_offs[seg_idx],
331                });
332            }
333        }
334
335        // Buffer term results - needed because SSTableWriter can't be held across await points
336        // Memory is bounded by unique terms (typically much smaller than postings)
337        let mut term_results: Vec<(Vec<u8>, TermInfo)> = Vec::new();
338        let mut terms_processed = 0usize;
339
340        while !heap.is_empty() {
341            // Get the smallest key
342            let first = heap.pop().unwrap();
343            let current_key = first.key.clone();
344
345            // Collect all entries with the same key
346            let mut sources: Vec<(usize, TermInfo, u32)> =
347                vec![(first.segment_idx, first.term_info, first.doc_offset)];
348
349            // Advance the iterator that provided this entry
350            if let Some((key, term_info)) = iterators[first.segment_idx]
351                .next()
352                .await
353                .map_err(crate::Error::from)?
354            {
355                heap.push(MergeEntry {
356                    key,
357                    term_info,
358                    segment_idx: first.segment_idx,
359                    doc_offset: doc_offs[first.segment_idx],
360                });
361            }
362
363            // Check if other segments have the same key
364            while let Some(entry) = heap.peek() {
365                if entry.key != current_key {
366                    break;
367                }
368                let entry = heap.pop().unwrap();
369                sources.push((entry.segment_idx, entry.term_info, entry.doc_offset));
370
371                // Advance this iterator too
372                if let Some((key, term_info)) = iterators[entry.segment_idx]
373                    .next()
374                    .await
375                    .map_err(crate::Error::from)?
376                {
377                    heap.push(MergeEntry {
378                        key,
379                        term_info,
380                        segment_idx: entry.segment_idx,
381                        doc_offset: doc_offs[entry.segment_idx],
382                    });
383                }
384            }
385
386            // Process this term (handles both single-source and multi-source)
387            let term_info = self
388                .merge_term(segments, &sources, postings_out, positions_out)
389                .await?;
390
391            term_results.push((current_key, term_info));
392            terms_processed += 1;
393
394            // Log progress every 100k terms
395            if terms_processed.is_multiple_of(100_000) {
396                log::debug!("Merge progress: {} terms processed", terms_processed);
397            }
398        }
399
400        // Track memory (only term_results is buffered; postings/positions stream to disk)
401        let results_mem = term_results.capacity() * std::mem::size_of::<(Vec<u8>, TermInfo)>();
402        stats.current_memory_bytes = results_mem;
403        stats.peak_memory_bytes = stats.peak_memory_bytes.max(stats.current_memory_bytes);
404
405        log::info!(
406            "[merge] complete: terms={}, segments={}, term_buffer={:.2} MB, postings={}, positions={}",
407            terms_processed,
408            segments.len(),
409            results_mem as f64 / (1024.0 * 1024.0),
410            format_bytes(postings_out.offset() as usize),
411            format_bytes(positions_out.offset() as usize),
412        );
413
414        // Write to SSTable (sync, no await points)
415        let mut writer = SSTableWriter::<TermInfo>::new(term_dict);
416        for (key, term_info) in term_results {
417            writer.insert(&key, &term_info)?;
418        }
419        writer.finish()?;
420
421        Ok(terms_processed)
422    }
423
424    /// Merge a single term's postings + positions from one or more source segments.
425    ///
426    /// Fast path: when all sources are external and there are multiple,
427    /// uses block-level concatenation (O(blocks) instead of O(postings)).
428    /// Otherwise: full decode → remap doc IDs → re-encode.
429    async fn merge_term(
430        &self,
431        segments: &[SegmentReader],
432        sources: &[(usize, TermInfo, u32)],
433        postings_out: &mut OffsetWriter,
434        positions_out: &mut OffsetWriter,
435    ) -> Result<TermInfo> {
436        let mut sorted: Vec<_> = sources.to_vec();
437        sorted.sort_by_key(|(_, _, off)| *off);
438
439        let any_positions = sorted.iter().any(|(_, ti, _)| ti.position_info().is_some());
440        let all_external = sorted.iter().all(|(_, ti, _)| ti.external_info().is_some());
441
442        // === Merge postings ===
443        let (posting_offset, posting_len, doc_count) = if all_external && sorted.len() > 1 {
444            // Fast path: block-level concatenation
445            let mut block_sources = Vec::with_capacity(sorted.len());
446            for (seg_idx, ti, doc_off) in &sorted {
447                let (off, len) = ti.external_info().unwrap();
448                let bytes = segments[*seg_idx].read_postings(off, len).await?;
449                let bpl = BlockPostingList::deserialize(&mut bytes.as_slice())?;
450                block_sources.push((bpl, *doc_off));
451            }
452            let merged = BlockPostingList::concatenate_blocks(&block_sources)?;
453            let offset = postings_out.offset();
454            let mut buf = Vec::new();
455            merged.serialize(&mut buf)?;
456            postings_out.write_all(&buf)?;
457            (offset, buf.len() as u32, merged.doc_count())
458        } else {
459            // Decode all sources into a flat PostingList, remap doc IDs
460            let mut merged = PostingList::new();
461            for (seg_idx, ti, doc_off) in &sorted {
462                if let Some((ids, tfs)) = ti.decode_inline() {
463                    for (id, tf) in ids.into_iter().zip(tfs) {
464                        merged.add(id + doc_off, tf);
465                    }
466                } else {
467                    let (off, len) = ti.external_info().unwrap();
468                    let bytes = segments[*seg_idx].read_postings(off, len).await?;
469                    let bpl = BlockPostingList::deserialize(&mut bytes.as_slice())?;
470                    let mut it = bpl.iterator();
471                    while it.doc() != TERMINATED {
472                        merged.add(it.doc() + doc_off, it.term_freq());
473                        it.advance();
474                    }
475                }
476            }
477            // Try to inline (only when no positions)
478            if !any_positions {
479                let ids: Vec<u32> = merged.iter().map(|p| p.doc_id).collect();
480                let tfs: Vec<u32> = merged.iter().map(|p| p.term_freq).collect();
481                if let Some(inline) = TermInfo::try_inline(&ids, &tfs) {
482                    return Ok(inline);
483                }
484            }
485            let offset = postings_out.offset();
486            let block = BlockPostingList::from_posting_list(&merged)?;
487            let mut buf = Vec::new();
488            block.serialize(&mut buf)?;
489            postings_out.write_all(&buf)?;
490            (offset, buf.len() as u32, merged.doc_count())
491        };
492
493        // === Merge positions (if any source has them) ===
494        if any_positions {
495            let mut pos_sources = Vec::new();
496            for (seg_idx, ti, doc_off) in &sorted {
497                if let Some((pos_off, pos_len)) = ti.position_info()
498                    && let Some(bytes) = segments[*seg_idx]
499                        .read_position_bytes(pos_off, pos_len)
500                        .await?
501                {
502                    let pl = PositionPostingList::deserialize(&mut bytes.as_slice())
503                        .map_err(crate::Error::Io)?;
504                    pos_sources.push((pl, *doc_off));
505                }
506            }
507            if !pos_sources.is_empty() {
508                let merged = PositionPostingList::concatenate_blocks(&pos_sources)
509                    .map_err(crate::Error::Io)?;
510                let offset = positions_out.offset();
511                let mut buf = Vec::new();
512                merged.serialize(&mut buf).map_err(crate::Error::Io)?;
513                positions_out.write_all(&buf)?;
514                return Ok(TermInfo::external_with_positions(
515                    posting_offset,
516                    posting_len,
517                    doc_count,
518                    offset,
519                    buf.len() as u32,
520                ));
521            }
522        }
523
524        Ok(TermInfo::external(posting_offset, posting_len, doc_count))
525    }
526
527    /// Iterate over all vectors in a segment for a given field, calling `f` for each.
528    /// Handles Flat, RaBitQ, and IVF index types uniformly.
529    /// `doc_id_offset` is added to each doc_id for multi-segment merges.
530    fn for_each_vector(
531        segment: &SegmentReader,
532        field: crate::dsl::Field,
533        doc_id_offset: u32,
534        mut f: impl FnMut(u32, u16, &[f32]),
535    ) {
536        match segment.vector_indexes().get(&field.0) {
537            Some(super::VectorIndex::Flat(flat_data)) => {
538                for i in 0..flat_data.num_vectors() {
539                    let (doc_id, ordinal) = flat_data.get_doc_id(i);
540                    f(doc_id_offset + doc_id, ordinal, flat_data.get_vector(i));
541                }
542            }
543            Some(super::VectorIndex::RaBitQ(index)) => {
544                if let Some(raw_vecs) = &index.raw_vectors {
545                    for (i, vec) in raw_vecs.iter().enumerate() {
546                        f(doc_id_offset + i as u32, 0, vec);
547                    }
548                }
549            }
550            Some(super::VectorIndex::IVF { index, .. }) => {
551                for cluster in index.clusters.clusters.values() {
552                    if let Some(ref raw_vecs) = cluster.raw_vectors {
553                        for (i, raw) in raw_vecs.iter().enumerate() {
554                            f(doc_id_offset + cluster.doc_ids[i], cluster.ordinals[i], raw);
555                        }
556                    }
557                }
558            }
559            _ => {}
560        }
561    }
562
563    /// Merge dense vector indexes - returns output size in bytes
564    ///
565    /// Strategy:
566    /// 1. O(1) cluster merge when all segments have the same ANN type (IVF/ScaNN)
567    /// 2. ANN rebuild if trained structures available — must collect vectors in memory
568    /// 3. Flat streaming merge — streams vectors directly from segments to disk
569    ///    without collecting in memory (O(1) extra memory per field)
570    async fn merge_dense_vectors<D: Directory + DirectoryWriter>(
571        &self,
572        dir: &D,
573        segments: &[SegmentReader],
574        files: &SegmentFiles,
575        trained: Option<&TrainedVectorStructures>,
576    ) -> Result<usize> {
577        use super::vector_data::FlatVectorData;
578        use crate::dsl::VectorIndexType;
579
580        /// Pre-serialized field data (ANN indexes, cluster merges)
581        struct BlobField {
582            field_id: u32,
583            index_type: u8,
584            data: Vec<u8>,
585        }
586
587        /// Flat field to be streamed directly from segments
588        struct FlatStreamField {
589            field_id: u32,
590            dim: usize,
591            total_vectors: usize,
592        }
593
594        let mut blob_fields: Vec<BlobField> = Vec::new();
595        let mut flat_fields: Vec<FlatStreamField> = Vec::new();
596        let doc_offs = doc_offsets(segments);
597
598        for (field, entry) in self.schema.fields() {
599            if !matches!(entry.field_type, FieldType::DenseVector) || !entry.indexed {
600                continue;
601            }
602
603            // --- Fast path: O(1) cluster merge for homogeneous ScaNN ---
604            let scann_indexes: Vec<_> = segments
605                .iter()
606                .filter_map(|s| s.get_scann_vector_index(field))
607                .collect();
608
609            let segments_with_vectors = segments
610                .iter()
611                .filter(|s| s.has_dense_vector_index(field))
612                .count();
613
614            if scann_indexes.len() == segments_with_vectors && !scann_indexes.is_empty() {
615                let refs: Vec<&crate::structures::IVFPQIndex> =
616                    scann_indexes.iter().map(|(idx, _)| idx.as_ref()).collect();
617
618                match crate::structures::IVFPQIndex::merge(&refs, &doc_offs) {
619                    Ok(merged) => {
620                        let bytes = merged
621                            .to_bytes()
622                            .map_err(|e| crate::Error::Serialization(e.to_string()))?;
623                        blob_fields.push(BlobField {
624                            field_id: field.0,
625                            index_type: 2,
626                            data: bytes,
627                        });
628                        continue;
629                    }
630                    Err(e) => {
631                        log::warn!("ScaNN merge failed: {}, falling back to rebuild", e);
632                    }
633                }
634            }
635
636            // --- Fast path: O(1) cluster merge for homogeneous IVF-RaBitQ ---
637            let ivf_indexes: Vec<_> = segments
638                .iter()
639                .filter_map(|s| s.get_ivf_vector_index(field))
640                .collect();
641
642            if ivf_indexes.len() == segments_with_vectors && !ivf_indexes.is_empty() {
643                let refs: Vec<&crate::structures::IVFRaBitQIndex> =
644                    ivf_indexes.iter().map(|arc| arc.as_ref()).collect();
645
646                match crate::structures::IVFRaBitQIndex::merge(&refs, &doc_offs) {
647                    Ok(merged) => {
648                        let bytes = merged
649                            .to_bytes()
650                            .map_err(|e| crate::Error::Serialization(e.to_string()))?;
651                        blob_fields.push(BlobField {
652                            field_id: field.0,
653                            index_type: 1,
654                            data: bytes,
655                        });
656                        continue;
657                    }
658                    Err(e) => {
659                        log::warn!("IVF merge failed: {}, falling back to rebuild", e);
660                    }
661                }
662            }
663
664            // --- Determine field dimension and total vector count ---
665            let config = entry.dense_vector_config.as_ref();
666
667            // Helper: get dim from segments
668            let dim_from_segments = || -> usize {
669                segments
670                    .iter()
671                    .filter_map(|s| match s.vector_indexes().get(&field.0) {
672                        Some(super::VectorIndex::Flat(f)) => Some(f.dim),
673                        _ => None,
674                    })
675                    .find(|&d| d > 0)
676                    .unwrap_or(0)
677            };
678            let dim = config
679                .map(|c| c.index_dim())
680                .unwrap_or_else(dim_from_segments);
681            if dim == 0 {
682                continue;
683            }
684
685            // Check if all segments have Flat indexes (enables streaming)
686            let all_flat = segments.iter().all(|s| {
687                matches!(
688                    s.vector_indexes().get(&field.0),
689                    Some(super::VectorIndex::Flat(_)) | None
690                )
691            });
692
693            // Check if ANN rebuild is possible
694            let ann_type =
695                trained
696                    .zip(config)
697                    .and_then(|(trained, config)| match config.index_type {
698                        VectorIndexType::IvfRaBitQ if trained.centroids.contains_key(&field.0) => {
699                            Some(VectorIndexType::IvfRaBitQ)
700                        }
701                        VectorIndexType::ScaNN
702                            if trained.centroids.contains_key(&field.0)
703                                && trained.codebooks.contains_key(&field.0) =>
704                        {
705                            Some(VectorIndexType::ScaNN)
706                        }
707                        _ => None,
708                    });
709
710            if let Some(ann) = ann_type {
711                // --- Streaming ANN rebuild: iterate segments, call add_vector ---
712                // No Vec<Vec<f32>> collection — only the ANN index structure in memory.
713                let trained = trained.unwrap();
714                let config = config.unwrap();
715                let mut total_vectors = 0usize;
716
717                match ann {
718                    VectorIndexType::IvfRaBitQ => {
719                        let centroids = &trained.centroids[&field.0];
720                        let rabitq_config = crate::structures::RaBitQConfig::new(dim);
721                        let codebook = crate::structures::RaBitQCodebook::new(rabitq_config);
722                        let ivf_config = crate::structures::IVFRaBitQConfig::new(dim)
723                            .with_store_raw(config.store_raw);
724                        let mut ivf_index = crate::structures::IVFRaBitQIndex::new(
725                            ivf_config,
726                            centroids.version,
727                            codebook.version,
728                        );
729
730                        // Stream vectors from each segment directly into the index
731                        for (seg_idx, segment) in segments.iter().enumerate() {
732                            let offset = doc_offs[seg_idx];
733                            Self::for_each_vector(
734                                segment,
735                                field,
736                                offset,
737                                |doc_id, ordinal, vec| {
738                                    ivf_index
739                                        .add_vector(centroids, &codebook, doc_id, ordinal, vec);
740                                    total_vectors += 1;
741                                },
742                            );
743                        }
744
745                        let index_data = super::builder::IVFRaBitQIndexData {
746                            centroids: (**centroids).clone(),
747                            codebook,
748                            index: ivf_index,
749                        };
750                        let bytes = index_data
751                            .to_bytes()
752                            .map_err(|e| crate::Error::Serialization(e.to_string()))?;
753                        blob_fields.push(BlobField {
754                            field_id: field.0,
755                            index_type: 1,
756                            data: bytes,
757                        });
758                        log::info!(
759                            "Rebuilt IVF-RaBitQ for field {} ({} vectors, streaming)",
760                            field.0,
761                            total_vectors
762                        );
763                    }
764                    VectorIndexType::ScaNN => {
765                        let centroids = &trained.centroids[&field.0];
766                        let codebook = &trained.codebooks[&field.0];
767                        let ivf_pq_config = crate::structures::IVFPQConfig::new(dim);
768                        let mut ivf_pq_index = crate::structures::IVFPQIndex::new(
769                            ivf_pq_config,
770                            centroids.version,
771                            codebook.version,
772                        );
773
774                        // Stream vectors from each segment directly into the index
775                        for (seg_idx, segment) in segments.iter().enumerate() {
776                            let offset = doc_offs[seg_idx];
777                            Self::for_each_vector(
778                                segment,
779                                field,
780                                offset,
781                                |doc_id, ordinal, vec| {
782                                    ivf_pq_index
783                                        .add_vector(centroids, codebook, doc_id, ordinal, vec);
784                                    total_vectors += 1;
785                                },
786                            );
787                        }
788
789                        let index_data = super::builder::ScaNNIndexData {
790                            centroids: (**centroids).clone(),
791                            codebook: (**codebook).clone(),
792                            index: ivf_pq_index,
793                        };
794                        let bytes = index_data
795                            .to_bytes()
796                            .map_err(|e| crate::Error::Serialization(e.to_string()))?;
797                        blob_fields.push(BlobField {
798                            field_id: field.0,
799                            index_type: 2,
800                            data: bytes,
801                        });
802                        log::info!(
803                            "Rebuilt ScaNN for field {} ({} vectors, streaming)",
804                            field.0,
805                            total_vectors
806                        );
807                    }
808                    _ => {}
809                }
810            } else if all_flat {
811                // --- Streaming flat merge: zero extra memory for vectors ---
812                let total_vectors: usize = segments
813                    .iter()
814                    .filter_map(|s| {
815                        if let Some(super::VectorIndex::Flat(f)) = s.vector_indexes().get(&field.0)
816                        {
817                            Some(f.num_vectors())
818                        } else {
819                            None
820                        }
821                    })
822                    .sum();
823
824                if total_vectors > 0 {
825                    flat_fields.push(FlatStreamField {
826                        field_id: field.0,
827                        dim,
828                        total_vectors,
829                    });
830                }
831            } else {
832                // --- Non-flat segments without ANN: serialize as Flat blob ---
833                // Stream vectors through a writer buffer (no Vec<Vec<f32>>)
834                let mut total_vectors = 0usize;
835                for segment in segments {
836                    Self::for_each_vector(segment, field, 0, |_, _, _| {
837                        total_vectors += 1;
838                    });
839                }
840                if total_vectors == 0 {
841                    continue;
842                }
843
844                let total_size = FlatVectorData::serialized_binary_size(dim, total_vectors);
845                let mut buf = Vec::with_capacity(total_size);
846                FlatVectorData::write_binary_header(dim, total_vectors, &mut buf)
847                    .map_err(|e| crate::Error::Serialization(e.to_string()))?;
848
849                // Pass 1: write vectors
850                for segment in segments {
851                    Self::for_each_vector(segment, field, 0, |_, _, vec| {
852                        let bytes: &[u8] = unsafe {
853                            std::slice::from_raw_parts(
854                                vec.as_ptr() as *const u8,
855                                std::mem::size_of_val(vec),
856                            )
857                        };
858                        let _ = buf.write_all(bytes);
859                    });
860                }
861
862                // Pass 2: write doc_ids with offsets
863                for (seg_idx, segment) in segments.iter().enumerate() {
864                    let offset = doc_offs[seg_idx];
865                    Self::for_each_vector(segment, field, offset, |doc_id, ordinal, _| {
866                        let _ = buf.write_all(&doc_id.to_le_bytes());
867                        let _ = buf.write_all(&ordinal.to_le_bytes());
868                    });
869                }
870
871                blob_fields.push(BlobField {
872                    field_id: field.0,
873                    index_type: 4,
874                    data: buf,
875                });
876            }
877        }
878
879        // --- Write vectors file with streaming support ---
880        let total_fields = blob_fields.len() + flat_fields.len();
881        if total_fields == 0 {
882            return Ok(0);
883        }
884
885        // Build sorted field entries with pre-computed sizes
886        struct FieldEntry {
887            field_id: u32,
888            index_type: u8,
889            data_size: u64,
890            /// Index into blob_fields (None = flat stream)
891            blob_idx: Option<usize>,
892            /// Index into flat_fields (None = blob)
893            flat_idx: Option<usize>,
894        }
895
896        let mut entries: Vec<FieldEntry> = Vec::with_capacity(total_fields);
897
898        for (i, blob) in blob_fields.iter().enumerate() {
899            entries.push(FieldEntry {
900                field_id: blob.field_id,
901                index_type: blob.index_type,
902                data_size: blob.data.len() as u64,
903                blob_idx: Some(i),
904                flat_idx: None,
905            });
906        }
907
908        for (i, flat) in flat_fields.iter().enumerate() {
909            entries.push(FieldEntry {
910                field_id: flat.field_id,
911                index_type: 4, // Flat binary
912                data_size: FlatVectorData::serialized_binary_size(flat.dim, flat.total_vectors)
913                    as u64,
914                blob_idx: None,
915                flat_idx: Some(i),
916            });
917        }
918
919        entries.sort_by_key(|e| e.field_id);
920
921        // Write header + data
922        use byteorder::{LittleEndian, WriteBytesExt};
923        let mut writer = OffsetWriter::new(dir.streaming_writer(&files.vectors).await?);
924
925        let per_field_entry =
926            size_of::<u32>() + size_of::<u8>() + size_of::<u64>() + size_of::<u64>();
927        let header_size = size_of::<u32>() + entries.len() * per_field_entry;
928
929        writer.write_u32::<LittleEndian>(entries.len() as u32)?;
930
931        let mut current_offset = header_size as u64;
932        for entry in &entries {
933            writer.write_u32::<LittleEndian>(entry.field_id)?;
934            writer.write_u8(entry.index_type)?;
935            writer.write_u64::<LittleEndian>(current_offset)?;
936            writer.write_u64::<LittleEndian>(entry.data_size)?;
937            current_offset += entry.data_size;
938        }
939
940        // Write field data (blobs directly, flat fields streamed from segments)
941        for entry in &entries {
942            if let Some(blob_idx) = entry.blob_idx {
943                writer.write_all(&blob_fields[blob_idx].data)?;
944            } else if let Some(flat_idx) = entry.flat_idx {
945                let flat = &flat_fields[flat_idx];
946
947                // Write binary header
948                FlatVectorData::write_binary_header(flat.dim, flat.total_vectors, &mut writer)?;
949
950                // Pass 1: stream raw vector bytes from each segment (bulk copy)
951                for segment in segments {
952                    if let Some(super::VectorIndex::Flat(flat_data)) =
953                        segment.vector_indexes().get(&entry.field_id)
954                    {
955                        writer.write_all(flat_data.vectors_as_bytes())?;
956                    }
957                }
958
959                // Pass 2: stream doc_ids with offset adjustment
960                for (seg_idx, segment) in segments.iter().enumerate() {
961                    if let Some(super::VectorIndex::Flat(flat_data)) =
962                        segment.vector_indexes().get(&entry.field_id)
963                    {
964                        let offset = doc_offs[seg_idx];
965                        for &(doc_id, ordinal) in &flat_data.doc_ids {
966                            writer.write_all(&(offset + doc_id).to_le_bytes())?;
967                            writer.write_all(&ordinal.to_le_bytes())?;
968                        }
969                    }
970                }
971            }
972        }
973
974        let output_size = writer.offset() as usize;
975        writer.finish()?;
976        Ok(output_size)
977    }
978
979    /// Merge sparse vector indexes using block stacking
980    ///
981    /// This is O(blocks) instead of O(postings) - we stack blocks directly
982    /// and only adjust the first_doc_id in each block header by the doc offset.
983    /// Deltas within blocks remain unchanged since they're relative.
984    async fn merge_sparse_vectors<D: Directory + DirectoryWriter>(
985        &self,
986        dir: &D,
987        segments: &[SegmentReader],
988        files: &SegmentFiles,
989    ) -> Result<usize> {
990        use crate::structures::BlockSparsePostingList;
991        use byteorder::{LittleEndian, WriteBytesExt};
992
993        let doc_offs = doc_offsets(segments);
994        for (i, seg) in segments.iter().enumerate() {
995            log::debug!(
996                "Sparse merge: segment {} has {} docs, doc_offset={}",
997                i,
998                seg.num_docs(),
999                doc_offs[i]
1000            );
1001        }
1002
1003        // Collect all sparse vector fields from schema
1004        let sparse_fields: Vec<_> = self
1005            .schema
1006            .fields()
1007            .filter(|(_, entry)| matches!(entry.field_type, FieldType::SparseVector))
1008            .map(|(field, entry)| (field, entry.sparse_vector_config.clone()))
1009            .collect();
1010
1011        if sparse_fields.is_empty() {
1012            return Ok(0);
1013        }
1014
1015        // Collect field data: (field_id, quantization, max_dim_id, dim_id -> merged_posting_list)
1016        type SparseFieldData = (
1017            u32,
1018            crate::structures::WeightQuantization,
1019            u32,
1020            FxHashMap<u32, Vec<u8>>,
1021        );
1022        let mut field_data: Vec<SparseFieldData> = Vec::new();
1023
1024        for (field, sparse_config) in &sparse_fields {
1025            // Get quantization from config
1026            let quantization = sparse_config
1027                .as_ref()
1028                .map(|c| c.weight_quantization)
1029                .unwrap_or(crate::structures::WeightQuantization::Float32);
1030
1031            // Collect all dimensions across all segments for this field
1032            let mut all_dims: rustc_hash::FxHashSet<u32> = rustc_hash::FxHashSet::default();
1033            for segment in segments {
1034                if let Some(sparse_index) = segment.sparse_indexes().get(&field.0) {
1035                    for dim_id in sparse_index.active_dimensions() {
1036                        all_dims.insert(dim_id);
1037                    }
1038                }
1039            }
1040
1041            if all_dims.is_empty() {
1042                continue;
1043            }
1044
1045            // Bulk-read ALL posting lists per segment in one I/O call each.
1046            // This replaces 80K+ individual get_posting() calls with ~4 bulk reads.
1047            let mut segment_postings: Vec<FxHashMap<u32, Arc<BlockSparsePostingList>>> =
1048                Vec::with_capacity(segments.len());
1049            for (seg_idx, segment) in segments.iter().enumerate() {
1050                if let Some(sparse_index) = segment.sparse_indexes().get(&field.0) {
1051                    log::debug!(
1052                        "Sparse merge field {}: bulk-reading {} dims from segment {}",
1053                        field.0,
1054                        sparse_index.num_dimensions(),
1055                        seg_idx
1056                    );
1057                    let postings = sparse_index.read_all_postings_bulk().await?;
1058                    segment_postings.push(postings);
1059                } else {
1060                    segment_postings.push(FxHashMap::default());
1061                }
1062            }
1063
1064            let mut dim_bytes: FxHashMap<u32, Vec<u8>> = FxHashMap::default();
1065
1066            // Merge from in-memory data — no I/O in this loop
1067            for dim_id in all_dims {
1068                let mut posting_arcs: Vec<(Arc<BlockSparsePostingList>, u32)> = Vec::new();
1069
1070                for (seg_idx, postings) in segment_postings.iter().enumerate() {
1071                    if let Some(posting_list) = postings.get(&dim_id) {
1072                        posting_arcs.push((Arc::clone(posting_list), doc_offs[seg_idx]));
1073                    }
1074                }
1075
1076                if posting_arcs.is_empty() {
1077                    continue;
1078                }
1079
1080                let lists_with_offsets: Vec<(&BlockSparsePostingList, u32)> = posting_arcs
1081                    .iter()
1082                    .map(|(pl, offset)| (pl.as_ref(), *offset))
1083                    .collect();
1084
1085                let merged = BlockSparsePostingList::merge_with_offsets(&lists_with_offsets);
1086
1087                let mut bytes = Vec::new();
1088                merged.serialize(&mut bytes).map_err(crate::Error::Io)?;
1089                dim_bytes.insert(dim_id, bytes);
1090            }
1091
1092            // Drop bulk data before accumulating output
1093            drop(segment_postings);
1094
1095            // Store num_dims (active count) instead of max_dim_id
1096            field_data.push((field.0, quantization, dim_bytes.len() as u32, dim_bytes));
1097        }
1098
1099        if field_data.is_empty() {
1100            return Ok(0);
1101        }
1102
1103        // Sort by field_id
1104        field_data.sort_by_key(|(id, _, _, _)| *id);
1105
1106        // Compute header size and per-dimension offsets before writing
1107        // num_fields(u32) + per-field: field_id(u32) + quant(u8) + num_dims(u32)
1108        // per-dim: dim_id(u32) + offset(u64) + length(u32)
1109        let per_dim_entry = size_of::<u32>() + size_of::<u64>() + size_of::<u32>();
1110        let per_field_header = size_of::<u32>() + size_of::<u8>() + size_of::<u32>();
1111        let mut header_size = size_of::<u32>() as u64;
1112        for (_, _, num_dims, _) in &field_data {
1113            header_size += per_field_header as u64;
1114            header_size += (*num_dims as u64) * per_dim_entry as u64;
1115        }
1116
1117        // Pre-compute offset tables (small — just dim_id + offset + length per dim)
1118        let mut current_offset = header_size;
1119        let mut field_tables: Vec<Vec<(u32, u64, u32)>> = Vec::new();
1120        for (_, _, _, dim_bytes) in &field_data {
1121            let mut table: Vec<(u32, u64, u32)> = Vec::with_capacity(dim_bytes.len());
1122            let mut dims: Vec<_> = dim_bytes.keys().copied().collect();
1123            dims.sort();
1124            for dim_id in dims {
1125                let bytes = &dim_bytes[&dim_id];
1126                table.push((dim_id, current_offset, bytes.len() as u32));
1127                current_offset += bytes.len() as u64;
1128            }
1129            field_tables.push(table);
1130        }
1131
1132        // Stream header + tables + data directly to disk
1133        let mut writer = OffsetWriter::new(dir.streaming_writer(&files.sparse).await?);
1134
1135        writer.write_u32::<LittleEndian>(field_data.len() as u32)?;
1136        for (i, (field_id, quantization, num_dims, _)) in field_data.iter().enumerate() {
1137            writer.write_u32::<LittleEndian>(*field_id)?;
1138            writer.write_u8(*quantization as u8)?;
1139            writer.write_u32::<LittleEndian>(*num_dims)?;
1140            for &(dim_id, offset, length) in &field_tables[i] {
1141                writer.write_u32::<LittleEndian>(dim_id)?;
1142                writer.write_u64::<LittleEndian>(offset)?;
1143                writer.write_u32::<LittleEndian>(length)?;
1144            }
1145        }
1146
1147        // Stream posting data per-field, per-dimension (drop each after writing)
1148        for (_, _, _, dim_bytes) in field_data {
1149            let mut dims: Vec<_> = dim_bytes.keys().copied().collect();
1150            dims.sort();
1151            for dim_id in dims {
1152                writer.write_all(&dim_bytes[&dim_id])?;
1153            }
1154        }
1155
1156        let output_size = writer.offset() as usize;
1157        writer.finish()?;
1158
1159        log::info!(
1160            "Sparse vector merge complete: {} fields, {} bytes",
1161            field_tables.len(),
1162            output_size
1163        );
1164
1165        Ok(output_size)
1166    }
1167}
1168
1169/// Delete segment files from directory
1170pub async fn delete_segment<D: Directory + DirectoryWriter>(
1171    dir: &D,
1172    segment_id: SegmentId,
1173) -> Result<()> {
1174    let files = SegmentFiles::new(segment_id.0);
1175    let _ = dir.delete(&files.term_dict).await;
1176    let _ = dir.delete(&files.postings).await;
1177    let _ = dir.delete(&files.store).await;
1178    let _ = dir.delete(&files.meta).await;
1179    let _ = dir.delete(&files.vectors).await;
1180    Ok(())
1181}