Skip to main content

hermes_core/segment/
merger.rs

1//! Segment merger for combining multiple segments
2
3use std::cmp::Ordering;
4use std::collections::BinaryHeap;
5use std::sync::Arc;
6
7use rustc_hash::FxHashMap;
8
9use super::builder::{SegmentBuilder, SegmentBuilderConfig};
10use super::reader::SegmentReader;
11use super::store::StoreMerger;
12use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
13use crate::Result;
14use crate::directories::{Directory, DirectoryWriter};
15use crate::dsl::{FieldType, Schema};
16use crate::structures::{
17    BlockPostingList, PostingList, RaBitQConfig, RaBitQIndex, SSTableWriter, TERMINATED, TermInfo,
18};
19
20/// Statistics for merge operations
21#[derive(Debug, Clone, Default)]
22pub struct MergeStats {
23    /// Number of terms processed
24    pub terms_processed: usize,
25    /// Number of postings merged
26    pub postings_merged: usize,
27    /// Peak memory usage in bytes (estimated)
28    pub peak_memory_bytes: usize,
29    /// Current memory usage in bytes (estimated)
30    pub current_memory_bytes: usize,
31    /// Term dictionary output size
32    pub term_dict_bytes: usize,
33    /// Postings output size
34    pub postings_bytes: usize,
35    /// Store output size
36    pub store_bytes: usize,
37    /// Vector index output size
38    pub vectors_bytes: usize,
39    /// Sparse vector index output size
40    pub sparse_bytes: usize,
41}
42
43impl MergeStats {
44    /// Format memory as human-readable string
45    pub fn format_memory(bytes: usize) -> String {
46        if bytes >= 1024 * 1024 * 1024 {
47            format!("{:.2} GB", bytes as f64 / (1024.0 * 1024.0 * 1024.0))
48        } else if bytes >= 1024 * 1024 {
49            format!("{:.2} MB", bytes as f64 / (1024.0 * 1024.0))
50        } else if bytes >= 1024 {
51            format!("{:.2} KB", bytes as f64 / 1024.0)
52        } else {
53            format!("{} B", bytes)
54        }
55    }
56}
57
58/// Entry for k-way merge heap
59struct MergeEntry {
60    key: Vec<u8>,
61    term_info: TermInfo,
62    segment_idx: usize,
63    doc_offset: u32,
64}
65
66impl PartialEq for MergeEntry {
67    fn eq(&self, other: &Self) -> bool {
68        self.key == other.key
69    }
70}
71
72impl Eq for MergeEntry {}
73
74impl PartialOrd for MergeEntry {
75    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
76        Some(self.cmp(other))
77    }
78}
79
80impl Ord for MergeEntry {
81    fn cmp(&self, other: &Self) -> Ordering {
82        // Reverse order for min-heap (BinaryHeap is max-heap by default)
83        other.key.cmp(&self.key)
84    }
85}
86
87/// Segment merger - merges multiple segments into one
88pub struct SegmentMerger {
89    schema: Arc<Schema>,
90}
91
92impl SegmentMerger {
93    pub fn new(schema: Arc<Schema>) -> Self {
94        Self { schema }
95    }
96
97    /// Merge segments - uses optimized store stacking when possible
98    pub async fn merge<D: Directory + DirectoryWriter>(
99        &self,
100        dir: &D,
101        segments: &[SegmentReader],
102        new_segment_id: SegmentId,
103    ) -> Result<SegmentMeta> {
104        let (meta, _stats) = self.merge_with_stats(dir, segments, new_segment_id).await?;
105        Ok(meta)
106    }
107
108    /// Merge segments with memory tracking - returns merge statistics
109    pub async fn merge_with_stats<D: Directory + DirectoryWriter>(
110        &self,
111        dir: &D,
112        segments: &[SegmentReader],
113        new_segment_id: SegmentId,
114    ) -> Result<(SegmentMeta, MergeStats)> {
115        // Check if we can use optimized store stacking (no dictionaries)
116        let can_stack_stores = segments.iter().all(|s| !s.store_has_dict());
117
118        // Check if any segment has positions - if so, use rebuild merge
119        // (positions require doc ID remapping which optimized merge doesn't handle)
120        let has_positions = self
121            .schema
122            .fields()
123            .any(|(_, entry)| entry.positions.is_some());
124
125        // Note: sparse vectors now use optimized block stacking merge (no rebuild needed)
126        if can_stack_stores && !has_positions {
127            self.merge_optimized_with_stats(dir, segments, new_segment_id)
128                .await
129        } else {
130            self.merge_rebuild_with_stats(dir, segments, new_segment_id)
131                .await
132        }
133    }
134
135    /// Optimized merge with stats tracking
136    async fn merge_optimized_with_stats<D: Directory + DirectoryWriter>(
137        &self,
138        dir: &D,
139        segments: &[SegmentReader],
140        new_segment_id: SegmentId,
141    ) -> Result<(SegmentMeta, MergeStats)> {
142        self.merge_core(
143            dir,
144            segments,
145            new_segment_id,
146            DenseVectorStrategy::MergeExisting,
147        )
148        .await
149    }
150
151    /// Core merge: handles all mandatory parts (postings, store, sparse, field stats, meta)
152    /// and delegates dense vector handling to the provided strategy.
153    async fn merge_core<D: Directory + DirectoryWriter>(
154        &self,
155        dir: &D,
156        segments: &[SegmentReader],
157        new_segment_id: SegmentId,
158        dense_strategy: DenseVectorStrategy<'_>,
159    ) -> Result<(SegmentMeta, MergeStats)> {
160        let mut stats = MergeStats::default();
161        let files = SegmentFiles::new(new_segment_id.0);
162
163        // === Mandatory: merge postings ===
164        let mut term_dict_data = Vec::new();
165        let mut postings_data = Vec::new();
166        let terms_processed = self
167            .merge_postings_with_stats(
168                segments,
169                &mut term_dict_data,
170                &mut postings_data,
171                &mut stats,
172            )
173            .await?;
174        stats.terms_processed = terms_processed;
175        stats.term_dict_bytes = term_dict_data.len();
176        stats.postings_bytes = postings_data.len();
177
178        let current_mem = term_dict_data.capacity() + postings_data.capacity();
179        stats.current_memory_bytes = current_mem;
180        stats.peak_memory_bytes = stats.peak_memory_bytes.max(current_mem);
181
182        // === Mandatory: stack store files ===
183        let mut store_data = Vec::new();
184        {
185            let mut store_merger = StoreMerger::new(&mut store_data);
186            for segment in segments {
187                let raw_blocks = segment.store_raw_blocks();
188                let data_slice = segment.store_data_slice();
189                store_merger.append_store(data_slice, &raw_blocks).await?;
190            }
191            store_merger.finish()?;
192        }
193        stats.store_bytes = store_data.len();
194
195        let current_mem =
196            term_dict_data.capacity() + postings_data.capacity() + store_data.capacity();
197        stats.peak_memory_bytes = stats.peak_memory_bytes.max(current_mem);
198
199        dir.write(&files.term_dict, &term_dict_data).await?;
200        dir.write(&files.postings, &postings_data).await?;
201        dir.write(&files.store, &store_data).await?;
202
203        drop(term_dict_data);
204        drop(postings_data);
205        drop(store_data);
206
207        // === Dense vectors: strategy-dependent ===
208        let vectors_bytes = match &dense_strategy {
209            DenseVectorStrategy::MergeExisting => {
210                self.merge_dense_vectors_with_stats(dir, segments, &files)
211                    .await?
212            }
213            DenseVectorStrategy::BuildAnn(trained) => {
214                self.build_ann_vectors(dir, segments, &files, trained)
215                    .await?
216            }
217        };
218        stats.vectors_bytes = vectors_bytes;
219
220        // === Mandatory: merge sparse vectors ===
221        let sparse_bytes = self
222            .merge_sparse_vectors_optimized(dir, segments, &files)
223            .await?;
224        stats.sparse_bytes = sparse_bytes;
225
226        // === Mandatory: merge field stats + write meta ===
227        let mut merged_field_stats: FxHashMap<u32, FieldStats> = FxHashMap::default();
228        for segment in segments {
229            for (&field_id, field_stats) in &segment.meta().field_stats {
230                let entry = merged_field_stats.entry(field_id).or_default();
231                entry.total_tokens += field_stats.total_tokens;
232                entry.doc_count += field_stats.doc_count;
233            }
234        }
235
236        let total_docs: u32 = segments.iter().map(|s| s.num_docs()).sum();
237        let meta = SegmentMeta {
238            id: new_segment_id.0,
239            num_docs: total_docs,
240            field_stats: merged_field_stats,
241        };
242
243        dir.write(&files.meta, &meta.serialize()?).await?;
244
245        let label = match &dense_strategy {
246            DenseVectorStrategy::MergeExisting => "Merge",
247            DenseVectorStrategy::BuildAnn(_) => "ANN merge",
248        };
249        log::info!(
250            "{} complete: {} docs, {} terms, term_dict={}, postings={}, store={}, vectors={}, sparse={}",
251            label,
252            total_docs,
253            stats.terms_processed,
254            MergeStats::format_memory(stats.term_dict_bytes),
255            MergeStats::format_memory(stats.postings_bytes),
256            MergeStats::format_memory(stats.store_bytes),
257            MergeStats::format_memory(stats.vectors_bytes),
258            MergeStats::format_memory(stats.sparse_bytes),
259        );
260
261        Ok((meta, stats))
262    }
263
264    /// Fallback merge with stats tracking
265    async fn merge_rebuild_with_stats<D: Directory + DirectoryWriter>(
266        &self,
267        dir: &D,
268        segments: &[SegmentReader],
269        new_segment_id: SegmentId,
270    ) -> Result<(SegmentMeta, MergeStats)> {
271        let mut stats = MergeStats::default();
272
273        let mut builder =
274            SegmentBuilder::new((*self.schema).clone(), SegmentBuilderConfig::default())?;
275
276        for segment in segments {
277            for doc_id in 0..segment.num_docs() {
278                if let Some(doc) = segment.doc(doc_id).await? {
279                    builder.add_document(doc)?;
280                }
281
282                // Track memory periodically
283                if doc_id % 10000 == 0 {
284                    let builder_stats = builder.stats();
285                    stats.current_memory_bytes = builder_stats.estimated_memory_bytes;
286                    stats.peak_memory_bytes =
287                        stats.peak_memory_bytes.max(stats.current_memory_bytes);
288                }
289            }
290        }
291
292        let meta = builder.build(dir, new_segment_id).await?;
293        Ok((meta, stats))
294    }
295
296    /// Merge postings from multiple segments using streaming k-way merge
297    ///
298    /// This implementation uses a min-heap to merge terms from all segments
299    /// in sorted order without loading all terms into memory at once.
300    /// Memory usage is O(num_segments) instead of O(total_terms).
301    ///
302    /// Optimization: For terms that exist in only one segment, we copy the
303    /// posting data directly without decode/encode. Only terms that exist
304    /// in multiple segments need full merge.
305    ///
306    /// Returns the number of terms processed.
307    async fn merge_postings_with_stats(
308        &self,
309        segments: &[SegmentReader],
310        term_dict: &mut Vec<u8>,
311        postings_out: &mut Vec<u8>,
312        stats: &mut MergeStats,
313    ) -> Result<usize> {
314        // Calculate doc offsets for each segment
315        let mut doc_offsets = Vec::with_capacity(segments.len());
316        let mut offset = 0u32;
317        for segment in segments {
318            doc_offsets.push(offset);
319            offset += segment.num_docs();
320        }
321
322        // Create iterators for each segment's term dictionary
323        let mut iterators: Vec<_> = segments.iter().map(|s| s.term_dict_iter()).collect();
324
325        // Initialize min-heap with first entry from each segment
326        let mut heap: BinaryHeap<MergeEntry> = BinaryHeap::new();
327        for (seg_idx, iter) in iterators.iter_mut().enumerate() {
328            if let Some((key, term_info)) = iter.next().await.map_err(crate::Error::from)? {
329                heap.push(MergeEntry {
330                    key,
331                    term_info,
332                    segment_idx: seg_idx,
333                    doc_offset: doc_offsets[seg_idx],
334                });
335            }
336        }
337
338        // Buffer term results - needed because SSTableWriter can't be held across await points
339        // Memory is bounded by unique terms (typically much smaller than postings)
340        let mut term_results: Vec<(Vec<u8>, TermInfo)> = Vec::new();
341        let mut terms_processed = 0usize;
342
343        while !heap.is_empty() {
344            // Get the smallest key
345            let first = heap.pop().unwrap();
346            let current_key = first.key.clone();
347
348            // Collect all entries with the same key
349            let mut sources: Vec<(usize, TermInfo, u32)> =
350                vec![(first.segment_idx, first.term_info, first.doc_offset)];
351
352            // Advance the iterator that provided this entry
353            if let Some((key, term_info)) = iterators[first.segment_idx]
354                .next()
355                .await
356                .map_err(crate::Error::from)?
357            {
358                heap.push(MergeEntry {
359                    key,
360                    term_info,
361                    segment_idx: first.segment_idx,
362                    doc_offset: doc_offsets[first.segment_idx],
363                });
364            }
365
366            // Check if other segments have the same key
367            while let Some(entry) = heap.peek() {
368                if entry.key != current_key {
369                    break;
370                }
371                let entry = heap.pop().unwrap();
372                sources.push((entry.segment_idx, entry.term_info, entry.doc_offset));
373
374                // Advance this iterator too
375                if let Some((key, term_info)) = iterators[entry.segment_idx]
376                    .next()
377                    .await
378                    .map_err(crate::Error::from)?
379                {
380                    heap.push(MergeEntry {
381                        key,
382                        term_info,
383                        segment_idx: entry.segment_idx,
384                        doc_offset: doc_offsets[entry.segment_idx],
385                    });
386                }
387            }
388
389            // Process this term
390            let term_info = if sources.len() == 1 {
391                // Optimization: Term exists in only one segment - copy directly
392                let (seg_idx, source_info, seg_doc_offset) = &sources[0];
393                self.copy_term_posting(
394                    &segments[*seg_idx],
395                    source_info,
396                    *seg_doc_offset,
397                    postings_out,
398                )
399                .await?
400            } else {
401                // Term exists in multiple segments - need full merge
402                self.merge_term_postings(segments, &sources, postings_out)
403                    .await?
404            };
405
406            term_results.push((current_key, term_info));
407            terms_processed += 1;
408
409            // Log progress every 100k terms
410            if terms_processed.is_multiple_of(100_000) {
411                log::debug!("Merge progress: {} terms processed", terms_processed);
412            }
413        }
414
415        // Track memory
416        let results_mem = term_results.capacity() * std::mem::size_of::<(Vec<u8>, TermInfo)>();
417        stats.current_memory_bytes = results_mem + postings_out.capacity();
418        stats.peak_memory_bytes = stats.peak_memory_bytes.max(stats.current_memory_bytes);
419
420        log::info!(
421            "[merge] complete: terms={}, segments={}, term_buffer={:.2} MB, postings={:.2} MB, peak={:.2} MB",
422            terms_processed,
423            segments.len(),
424            results_mem as f64 / (1024.0 * 1024.0),
425            postings_out.capacity() as f64 / (1024.0 * 1024.0),
426            stats.peak_memory_bytes as f64 / (1024.0 * 1024.0)
427        );
428
429        // Write to SSTable (sync, no await points)
430        let mut writer = SSTableWriter::<TermInfo>::new(term_dict);
431        for (key, term_info) in term_results {
432            writer.insert(&key, &term_info)?;
433        }
434        writer.finish()?;
435
436        Ok(terms_processed)
437    }
438
439    /// Copy a term's posting data directly from source segment (no decode/encode)
440    /// Only adjusts doc IDs by adding the segment's doc offset
441    async fn copy_term_posting(
442        &self,
443        segment: &SegmentReader,
444        source_info: &TermInfo,
445        doc_offset: u32,
446        postings_out: &mut Vec<u8>,
447    ) -> Result<TermInfo> {
448        // Handle inline postings - need to remap doc IDs
449        if let Some((doc_ids, term_freqs)) = source_info.decode_inline() {
450            let remapped_ids: Vec<u32> = doc_ids.iter().map(|&id| id + doc_offset).collect();
451            if let Some(inline) = TermInfo::try_inline(&remapped_ids, &term_freqs) {
452                return Ok(inline);
453            }
454            // If can't inline after remapping (shouldn't happen), fall through to external
455            let mut pl = PostingList::with_capacity(remapped_ids.len());
456            for (doc_id, tf) in remapped_ids.into_iter().zip(term_freqs.into_iter()) {
457                pl.push(doc_id, tf);
458            }
459            let posting_offset = postings_out.len() as u64;
460            let block_list = BlockPostingList::from_posting_list(&pl)?;
461            let mut encoded = Vec::new();
462            block_list.serialize(&mut encoded)?;
463            postings_out.extend_from_slice(&encoded);
464            return Ok(TermInfo::external(
465                posting_offset,
466                encoded.len() as u32,
467                pl.doc_count(),
468            ));
469        }
470
471        // External posting - read, decode, remap doc IDs, re-encode
472        // Note: We can't just copy bytes because doc IDs are delta-encoded
473        let (offset, len) = source_info.external_info().unwrap();
474        let posting_bytes = segment.read_postings(offset, len).await?;
475        let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
476
477        // Remap doc IDs
478        let mut remapped = PostingList::with_capacity(source_postings.doc_count() as usize);
479        let mut iter = source_postings.iterator();
480        while iter.doc() != TERMINATED {
481            remapped.add(iter.doc() + doc_offset, iter.term_freq());
482            iter.advance();
483        }
484
485        // Try to inline if small enough
486        let doc_ids: Vec<u32> = remapped.iter().map(|p| p.doc_id).collect();
487        let term_freqs: Vec<u32> = remapped.iter().map(|p| p.term_freq).collect();
488
489        if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
490            return Ok(inline);
491        }
492
493        // Write to postings file
494        let posting_offset = postings_out.len() as u64;
495        let block_list = BlockPostingList::from_posting_list(&remapped)?;
496        let mut encoded = Vec::new();
497        block_list.serialize(&mut encoded)?;
498        postings_out.extend_from_slice(&encoded);
499
500        Ok(TermInfo::external(
501            posting_offset,
502            encoded.len() as u32,
503            remapped.doc_count(),
504        ))
505    }
506
507    /// Merge postings for a term that exists in multiple segments
508    /// Uses block-level concatenation for O(num_blocks) instead of O(num_postings)
509    async fn merge_term_postings(
510        &self,
511        segments: &[SegmentReader],
512        sources: &[(usize, TermInfo, u32)],
513        postings_out: &mut Vec<u8>,
514    ) -> Result<TermInfo> {
515        // Sort sources by doc_offset to ensure postings are added in sorted order
516        let mut sorted_sources: Vec<_> = sources.to_vec();
517        sorted_sources.sort_by_key(|(_, _, doc_offset)| *doc_offset);
518
519        // Check if all sources are external (can use block concatenation)
520        let all_external = sorted_sources
521            .iter()
522            .all(|(_, term_info, _)| term_info.external_info().is_some());
523
524        if all_external && sorted_sources.len() > 1 {
525            // Fast path: block-level concatenation
526            let mut block_sources = Vec::with_capacity(sorted_sources.len());
527
528            for (seg_idx, term_info, doc_offset) in &sorted_sources {
529                let segment = &segments[*seg_idx];
530                let (offset, len) = term_info.external_info().unwrap();
531                let posting_bytes = segment.read_postings(offset, len).await?;
532                let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
533                block_sources.push((source_postings, *doc_offset));
534            }
535
536            let merged_blocks = BlockPostingList::concatenate_blocks(&block_sources)?;
537            let posting_offset = postings_out.len() as u64;
538            let mut encoded = Vec::new();
539            merged_blocks.serialize(&mut encoded)?;
540            postings_out.extend_from_slice(&encoded);
541
542            return Ok(TermInfo::external(
543                posting_offset,
544                encoded.len() as u32,
545                merged_blocks.doc_count(),
546            ));
547        }
548
549        // Slow path: full decode/encode for inline postings or single source
550        let mut merged = PostingList::new();
551
552        for (seg_idx, term_info, doc_offset) in &sorted_sources {
553            let segment = &segments[*seg_idx];
554
555            if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
556                // Inline posting list
557                for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
558                    merged.add(doc_id + doc_offset, tf);
559                }
560            } else {
561                // External posting list
562                let (offset, len) = term_info.external_info().unwrap();
563                let posting_bytes = segment.read_postings(offset, len).await?;
564                let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
565
566                let mut iter = source_postings.iterator();
567                while iter.doc() != TERMINATED {
568                    merged.add(iter.doc() + doc_offset, iter.term_freq());
569                    iter.advance();
570                }
571            }
572        }
573
574        // Try to inline small posting lists
575        let doc_ids: Vec<u32> = merged.iter().map(|p| p.doc_id).collect();
576        let term_freqs: Vec<u32> = merged.iter().map(|p| p.term_freq).collect();
577
578        if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
579            return Ok(inline);
580        }
581
582        // Write to postings file
583        let posting_offset = postings_out.len() as u64;
584        let block_list = BlockPostingList::from_posting_list(&merged)?;
585        let mut encoded = Vec::new();
586        block_list.serialize(&mut encoded)?;
587        postings_out.extend_from_slice(&encoded);
588
589        Ok(TermInfo::external(
590            posting_offset,
591            encoded.len() as u32,
592            merged.doc_count(),
593        ))
594    }
595    /// Merge dense vector indexes with stats tracking - returns output size in bytes
596    ///
597    /// For ScaNN (IVF-PQ): O(1) merge by concatenating cluster data (same codebook)
598    /// For IVF-RaBitQ: O(1) merge by concatenating cluster data (same centroids)
599    /// For RaBitQ: Must rebuild with new centroid from all vectors
600    async fn merge_dense_vectors_with_stats<D: Directory + DirectoryWriter>(
601        &self,
602        dir: &D,
603        segments: &[SegmentReader],
604        files: &SegmentFiles,
605    ) -> Result<usize> {
606        use byteorder::{LittleEndian, WriteBytesExt};
607
608        // (field_id, index_type, data)
609        let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
610
611        for (field, entry) in self.schema.fields() {
612            if !matches!(entry.field_type, FieldType::DenseVector) {
613                continue;
614            }
615
616            // Check if all segments have ScaNN indexes for this field
617            let scann_indexes: Vec<_> = segments
618                .iter()
619                .filter_map(|s| s.get_scann_vector_index(field))
620                .collect();
621
622            if scann_indexes.len()
623                == segments
624                    .iter()
625                    .filter(|s| s.has_dense_vector_index(field))
626                    .count()
627                && !scann_indexes.is_empty()
628            {
629                // All segments have ScaNN - use O(1) cluster merge!
630                let refs: Vec<&crate::structures::IVFPQIndex> =
631                    scann_indexes.iter().map(|(idx, _)| idx.as_ref()).collect();
632
633                // Calculate doc_id offsets
634                let mut doc_offsets = Vec::with_capacity(segments.len());
635                let mut offset = 0u32;
636                for segment in segments {
637                    doc_offsets.push(offset);
638                    offset += segment.num_docs();
639                }
640
641                match crate::structures::IVFPQIndex::merge(&refs, &doc_offsets) {
642                    Ok(merged) => {
643                        let bytes = merged
644                            .to_bytes()
645                            .map_err(|e| crate::Error::Serialization(e.to_string()))?;
646                        field_indexes.push((field.0, 2u8, bytes)); // 2 = ScaNN
647                        continue;
648                    }
649                    Err(e) => {
650                        log::warn!("ScaNN merge failed: {}, falling back to IVF", e);
651                    }
652                }
653            }
654
655            // Check if all segments have IVF indexes for this field
656            let ivf_indexes: Vec<_> = segments
657                .iter()
658                .filter_map(|s| s.get_ivf_vector_index(field))
659                .collect();
660
661            if ivf_indexes.len()
662                == segments
663                    .iter()
664                    .filter(|s| s.has_dense_vector_index(field))
665                    .count()
666                && !ivf_indexes.is_empty()
667            {
668                // All segments have IVF - use O(1) cluster merge!
669                let refs: Vec<&crate::structures::IVFRaBitQIndex> =
670                    ivf_indexes.iter().map(|arc| arc.as_ref()).collect();
671
672                // Calculate doc_id offsets
673                let mut doc_offsets = Vec::with_capacity(segments.len());
674                let mut offset = 0u32;
675                for segment in segments {
676                    doc_offsets.push(offset);
677                    offset += segment.num_docs();
678                }
679
680                match crate::structures::IVFRaBitQIndex::merge(&refs, &doc_offsets) {
681                    Ok(merged) => {
682                        let bytes = merged
683                            .to_bytes()
684                            .map_err(|e| crate::Error::Serialization(e.to_string()))?;
685                        field_indexes.push((field.0, 1u8, bytes)); // 1 = IVF-RaBitQ
686                        continue;
687                    }
688                    Err(e) => {
689                        log::warn!("IVF merge failed: {}, falling back to rebuild", e);
690                    }
691                }
692            }
693
694            // Fall back to RaBitQ rebuild (collect raw vectors)
695            let mut all_vectors: Vec<Vec<f32>> = Vec::new();
696
697            for segment in segments {
698                if let Some(index) = segment.get_dense_vector_index(field)
699                    && let Some(raw_vecs) = &index.raw_vectors
700                {
701                    all_vectors.extend(raw_vecs.iter().cloned());
702                }
703            }
704
705            if !all_vectors.is_empty() {
706                let dim = all_vectors[0].len();
707                let config = RaBitQConfig::new(dim);
708                let merged_index = RaBitQIndex::build(config, &all_vectors, true);
709
710                let index_bytes = serde_json::to_vec(&merged_index)
711                    .map_err(|e| crate::Error::Serialization(e.to_string()))?;
712
713                field_indexes.push((field.0, 0u8, index_bytes)); // 0 = RaBitQ
714            }
715        }
716
717        // Write unified vectors file with index_type
718        if !field_indexes.is_empty() {
719            field_indexes.sort_by_key(|(id, _, _)| *id);
720
721            // Header: num_fields + (field_id, index_type, offset, len) per field
722            let header_size = 4 + field_indexes.len() * (4 + 1 + 8 + 8);
723            let mut output = Vec::new();
724
725            output.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
726
727            let mut current_offset = header_size as u64;
728            for (field_id, index_type, data) in &field_indexes {
729                output.write_u32::<LittleEndian>(*field_id)?;
730                output.write_u8(*index_type)?;
731                output.write_u64::<LittleEndian>(current_offset)?;
732                output.write_u64::<LittleEndian>(data.len() as u64)?;
733                current_offset += data.len() as u64;
734            }
735
736            for (_, _, data) in field_indexes {
737                output.extend_from_slice(&data);
738            }
739
740            let output_size = output.len();
741            dir.write(&files.vectors, &output).await?;
742            return Ok(output_size);
743        }
744
745        Ok(0)
746    }
747
748    /// Merge sparse vector indexes using optimized block stacking
749    ///
750    /// This is O(blocks) instead of O(postings) - we stack blocks directly
751    /// and only adjust the first_doc_id in each block header by the doc offset.
752    /// Deltas within blocks remain unchanged since they're relative.
753    async fn merge_sparse_vectors_optimized<D: Directory + DirectoryWriter>(
754        &self,
755        dir: &D,
756        segments: &[SegmentReader],
757        files: &SegmentFiles,
758    ) -> Result<usize> {
759        use crate::structures::BlockSparsePostingList;
760        use byteorder::{LittleEndian, WriteBytesExt};
761
762        // Calculate doc offsets for each segment
763        let mut doc_offsets = Vec::with_capacity(segments.len());
764        let mut offset = 0u32;
765        for (i, segment) in segments.iter().enumerate() {
766            log::debug!(
767                "Sparse merge: segment {} has {} docs, doc_offset={}",
768                i,
769                segment.num_docs(),
770                offset
771            );
772            doc_offsets.push(offset);
773            offset += segment.num_docs();
774        }
775
776        // Collect all sparse vector fields from schema
777        let sparse_fields: Vec<_> = self
778            .schema
779            .fields()
780            .filter(|(_, entry)| matches!(entry.field_type, FieldType::SparseVector))
781            .map(|(field, entry)| (field, entry.sparse_vector_config.clone()))
782            .collect();
783
784        if sparse_fields.is_empty() {
785            return Ok(0);
786        }
787
788        // Collect field data: (field_id, quantization, max_dim_id, dim_id -> merged_posting_list)
789        type SparseFieldData = (
790            u32,
791            crate::structures::WeightQuantization,
792            u32,
793            FxHashMap<u32, Vec<u8>>,
794        );
795        let mut field_data: Vec<SparseFieldData> = Vec::new();
796
797        for (field, sparse_config) in &sparse_fields {
798            // Get quantization from config
799            let quantization = sparse_config
800                .as_ref()
801                .map(|c| c.weight_quantization)
802                .unwrap_or(crate::structures::WeightQuantization::Float32);
803
804            // Collect all dimensions across all segments for this field
805            let mut all_dims: rustc_hash::FxHashSet<u32> = rustc_hash::FxHashSet::default();
806            for segment in segments {
807                if let Some(sparse_index) = segment.sparse_indexes().get(&field.0) {
808                    for dim_id in sparse_index.active_dimensions() {
809                        all_dims.insert(dim_id);
810                    }
811                }
812            }
813
814            if all_dims.is_empty() {
815                continue;
816            }
817
818            let _max_dim_id = all_dims.iter().max().copied().unwrap_or(0);
819            let mut dim_bytes: FxHashMap<u32, Vec<u8>> = FxHashMap::default();
820
821            // For each dimension, merge posting lists from all segments
822            for dim_id in all_dims {
823                // Collect posting lists for this dimension from all segments
824                // Keep Arcs alive while we borrow from them
825                let mut posting_arcs: Vec<(Arc<BlockSparsePostingList>, u32)> = Vec::new();
826
827                for (seg_idx, segment) in segments.iter().enumerate() {
828                    if let Some(sparse_index) = segment.sparse_indexes().get(&field.0)
829                        && let Ok(Some(posting_list)) = sparse_index.get_posting(dim_id).await
830                    {
831                        log::trace!(
832                            "Sparse merge dim={}: seg={} offset={} doc_count={} blocks={}",
833                            dim_id,
834                            seg_idx,
835                            doc_offsets[seg_idx],
836                            posting_list.doc_count(),
837                            posting_list.blocks.len()
838                        );
839                        posting_arcs.push((posting_list, doc_offsets[seg_idx]));
840                    }
841                }
842
843                // Create references for merge
844                let lists_with_offsets: Vec<(&BlockSparsePostingList, u32)> = posting_arcs
845                    .iter()
846                    .map(|(pl, offset)| (pl.as_ref(), *offset))
847                    .collect();
848
849                if lists_with_offsets.is_empty() {
850                    continue;
851                }
852
853                // Merge using optimized block stacking
854                let merged = BlockSparsePostingList::merge_with_offsets(&lists_with_offsets);
855
856                log::trace!(
857                    "Sparse merge dim={}: merged {} lists -> doc_count={} blocks={}",
858                    dim_id,
859                    lists_with_offsets.len(),
860                    merged.doc_count(),
861                    merged.blocks.len()
862                );
863
864                // Serialize
865                let mut bytes = Vec::new();
866                merged.serialize(&mut bytes).map_err(crate::Error::Io)?;
867                dim_bytes.insert(dim_id, bytes);
868            }
869
870            // Store num_dims (active count) instead of max_dim_id
871            field_data.push((field.0, quantization, dim_bytes.len() as u32, dim_bytes));
872        }
873
874        if field_data.is_empty() {
875            return Ok(0);
876        }
877
878        // Sort by field_id
879        field_data.sort_by_key(|(id, _, _, _)| *id);
880
881        // Build sparse file (compact format - only active dimensions)
882        // Header: num_fields (4)
883        // Per field: field_id (4) + quant (1) + num_dims (4) + table (16 * num_dims)
884        let mut header_size = 4u64;
885        for (_, _, num_dims, _) in &field_data {
886            header_size += 4 + 1 + 4; // field_id + quant + num_dims
887            header_size += (*num_dims as u64) * 16; // table entries: (dim_id, offset, length)
888        }
889
890        let mut output = Vec::new();
891        output.write_u32::<LittleEndian>(field_data.len() as u32)?;
892
893        let mut current_offset = header_size;
894        let mut all_data: Vec<u8> = Vec::new();
895        // (dim_id, offset, length) for each active dimension
896        let mut field_tables: Vec<Vec<(u32, u64, u32)>> = Vec::new();
897
898        for (_, _, _, dim_bytes) in &field_data {
899            let mut table: Vec<(u32, u64, u32)> = Vec::with_capacity(dim_bytes.len());
900
901            // Sort dimensions for deterministic output
902            let mut dims: Vec<_> = dim_bytes.keys().copied().collect();
903            dims.sort();
904
905            for dim_id in dims {
906                let bytes = &dim_bytes[&dim_id];
907                table.push((dim_id, current_offset, bytes.len() as u32));
908                current_offset += bytes.len() as u64;
909                all_data.extend_from_slice(bytes);
910            }
911            field_tables.push(table);
912        }
913
914        // Write field headers and compact tables
915        for (i, (field_id, quantization, num_dims, _)) in field_data.iter().enumerate() {
916            output.write_u32::<LittleEndian>(*field_id)?;
917            output.write_u8(*quantization as u8)?;
918            output.write_u32::<LittleEndian>(*num_dims)?;
919
920            // Write compact table: (dim_id, offset, length) only for active dims
921            for &(dim_id, offset, length) in &field_tables[i] {
922                output.write_u32::<LittleEndian>(dim_id)?;
923                output.write_u64::<LittleEndian>(offset)?;
924                output.write_u32::<LittleEndian>(length)?;
925            }
926        }
927
928        // Append all data
929        output.extend_from_slice(&all_data);
930
931        let output_size = output.len();
932        dir.write(&files.sparse, &output).await?;
933
934        log::info!(
935            "Sparse vector merge complete: {} fields, {} bytes",
936            field_data.len(),
937            output_size
938        );
939
940        Ok(output_size)
941    }
942}
943
944/// Trained vector index structures for rebuilding segments with ANN indexes
945pub struct TrainedVectorStructures {
946    /// Trained centroids per field_id
947    pub centroids: rustc_hash::FxHashMap<u32, Arc<crate::structures::CoarseCentroids>>,
948    /// Trained PQ codebooks per field_id (for ScaNN)
949    pub codebooks: rustc_hash::FxHashMap<u32, Arc<crate::structures::PQCodebook>>,
950}
951
952/// Strategy for handling dense vector indexes during merge
953pub enum DenseVectorStrategy<'a> {
954    /// Merge existing indexes (ScaNN/IVF cluster merge or RaBitQ rebuild)
955    MergeExisting,
956    /// Build ANN indexes from flat vectors using trained structures
957    BuildAnn(&'a TrainedVectorStructures),
958}
959
960impl SegmentMerger {
961    /// Merge segments and rebuild dense vectors with ANN indexes using trained structures
962    ///
963    /// This is called after centroids/codebooks are trained at index level.
964    /// It collects Flat vectors from all segments and builds IVF-RaBitQ or ScaNN indexes.
965    pub async fn merge_with_ann<D: Directory + DirectoryWriter>(
966        &self,
967        dir: &D,
968        segments: &[SegmentReader],
969        new_segment_id: SegmentId,
970        trained: &TrainedVectorStructures,
971    ) -> Result<SegmentMeta> {
972        let (meta, _stats) = self
973            .merge_core(
974                dir,
975                segments,
976                new_segment_id,
977                DenseVectorStrategy::BuildAnn(trained),
978            )
979            .await?;
980        Ok(meta)
981    }
982
983    /// Build ANN indexes from Flat vectors using trained centroids
984    async fn build_ann_vectors<D: Directory + DirectoryWriter>(
985        &self,
986        dir: &D,
987        segments: &[SegmentReader],
988        files: &SegmentFiles,
989        trained: &TrainedVectorStructures,
990    ) -> Result<usize> {
991        use crate::dsl::VectorIndexType;
992        use byteorder::{LittleEndian, WriteBytesExt};
993
994        let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
995
996        for (field, entry) in self.schema.fields() {
997            if !matches!(entry.field_type, FieldType::DenseVector) || !entry.indexed {
998                continue;
999            }
1000
1001            let config = match &entry.dense_vector_config {
1002                Some(c) => c,
1003                None => continue,
1004            };
1005
1006            // Collect all Flat vectors from segments
1007            let mut all_vectors: Vec<Vec<f32>> = Vec::new();
1008            let mut all_doc_ids: Vec<(u32, u16)> = Vec::new();
1009            let mut doc_offset = 0u32;
1010
1011            for segment in segments {
1012                if let Some(super::VectorIndex::Flat(flat_data)) =
1013                    segment.vector_indexes().get(&field.0)
1014                {
1015                    for (vec, (local_doc_id, ordinal)) in
1016                        flat_data.vectors.iter().zip(flat_data.doc_ids.iter())
1017                    {
1018                        all_vectors.push(vec.clone());
1019                        all_doc_ids.push((doc_offset + local_doc_id, *ordinal));
1020                    }
1021                }
1022                doc_offset += segment.num_docs();
1023            }
1024
1025            if all_vectors.is_empty() {
1026                continue;
1027            }
1028
1029            let dim = config.index_dim();
1030
1031            // Extract just doc_ids for ANN indexes (they don't track ordinals yet)
1032            let ann_doc_ids: Vec<u32> = all_doc_ids.iter().map(|(doc_id, _)| *doc_id).collect();
1033
1034            // Build ANN index based on index type and available trained structures
1035            match config.index_type {
1036                VectorIndexType::IvfRaBitQ => {
1037                    if let Some(centroids) = trained.centroids.get(&field.0) {
1038                        // Create RaBitQ codebook for the dimension
1039                        let rabitq_config = crate::structures::RaBitQConfig::new(dim);
1040                        let codebook = crate::structures::RaBitQCodebook::new(rabitq_config);
1041
1042                        // Build IVF-RaBitQ index
1043                        let ivf_config = crate::structures::IVFRaBitQConfig::new(dim)
1044                            .with_store_raw(config.store_raw);
1045                        let ivf_index = crate::structures::IVFRaBitQIndex::build(
1046                            ivf_config,
1047                            centroids,
1048                            &codebook,
1049                            &all_vectors,
1050                            Some(&ann_doc_ids),
1051                        );
1052
1053                        let index_data = super::builder::IVFRaBitQIndexData {
1054                            centroids: (**centroids).clone(),
1055                            codebook,
1056                            index: ivf_index,
1057                        };
1058                        let bytes = index_data
1059                            .to_bytes()
1060                            .map_err(|e| crate::Error::Serialization(e.to_string()))?;
1061                        field_indexes.push((field.0, 1u8, bytes)); // 1 = IVF-RaBitQ
1062
1063                        log::info!(
1064                            "Built IVF-RaBitQ index for field {} with {} vectors",
1065                            field.0,
1066                            all_vectors.len()
1067                        );
1068                        continue;
1069                    }
1070                }
1071                VectorIndexType::ScaNN => {
1072                    if let (Some(centroids), Some(codebook)) = (
1073                        trained.centroids.get(&field.0),
1074                        trained.codebooks.get(&field.0),
1075                    ) {
1076                        // Build ScaNN (IVF-PQ) index
1077                        let ivf_pq_config = crate::structures::IVFPQConfig::new(dim);
1078                        let ivf_pq_index = crate::structures::IVFPQIndex::build(
1079                            ivf_pq_config,
1080                            centroids,
1081                            codebook,
1082                            &all_vectors,
1083                            Some(&ann_doc_ids),
1084                        );
1085
1086                        let index_data = super::builder::ScaNNIndexData {
1087                            centroids: (**centroids).clone(),
1088                            codebook: (**codebook).clone(),
1089                            index: ivf_pq_index,
1090                        };
1091                        let bytes = index_data
1092                            .to_bytes()
1093                            .map_err(|e| crate::Error::Serialization(e.to_string()))?;
1094                        field_indexes.push((field.0, 2u8, bytes)); // 2 = ScaNN
1095
1096                        log::info!(
1097                            "Built ScaNN index for field {} with {} vectors",
1098                            field.0,
1099                            all_vectors.len()
1100                        );
1101                        continue;
1102                    }
1103                }
1104                _ => {}
1105            }
1106
1107            // Fallback: keep as Flat if no trained structures available
1108            let flat_data = super::builder::FlatVectorData {
1109                dim,
1110                vectors: all_vectors,
1111                doc_ids: all_doc_ids,
1112            };
1113            let bytes = flat_data.to_binary_bytes();
1114            field_indexes.push((field.0, 4u8, bytes)); // 4 = Flat Binary
1115        }
1116
1117        // Write vectors file
1118        if !field_indexes.is_empty() {
1119            field_indexes.sort_by_key(|(id, _, _)| *id);
1120
1121            let header_size = 4 + field_indexes.len() * (4 + 1 + 8 + 8);
1122            let mut output = Vec::new();
1123
1124            output.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
1125
1126            let mut current_offset = header_size as u64;
1127            for (field_id, index_type, data) in &field_indexes {
1128                output.write_u32::<LittleEndian>(*field_id)?;
1129                output.write_u8(*index_type)?;
1130                output.write_u64::<LittleEndian>(current_offset)?;
1131                output.write_u64::<LittleEndian>(data.len() as u64)?;
1132                current_offset += data.len() as u64;
1133            }
1134
1135            for (_, _, data) in field_indexes {
1136                output.extend_from_slice(&data);
1137            }
1138
1139            let output_size = output.len();
1140            dir.write(&files.vectors, &output).await?;
1141            return Ok(output_size);
1142        }
1143
1144        Ok(0)
1145    }
1146}
1147
1148/// Delete segment files from directory
1149pub async fn delete_segment<D: Directory + DirectoryWriter>(
1150    dir: &D,
1151    segment_id: SegmentId,
1152) -> Result<()> {
1153    let files = SegmentFiles::new(segment_id.0);
1154    let _ = dir.delete(&files.term_dict).await;
1155    let _ = dir.delete(&files.postings).await;
1156    let _ = dir.delete(&files.store).await;
1157    let _ = dir.delete(&files.meta).await;
1158    let _ = dir.delete(&files.vectors).await;
1159    Ok(())
1160}