Skip to main content

hermes_core/segment/
merger.rs

1//! Segment merger for combining multiple segments
2
3use std::cmp::Ordering;
4use std::collections::BinaryHeap;
5use std::sync::Arc;
6
7use rustc_hash::FxHashMap;
8
9use super::builder::{SegmentBuilder, SegmentBuilderConfig};
10use super::reader::SegmentReader;
11use super::store::StoreMerger;
12use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
13use crate::Result;
14use crate::directories::{Directory, DirectoryWriter};
15use crate::dsl::{FieldType, Schema};
16use crate::structures::{
17    BlockPostingList, PostingList, RaBitQConfig, RaBitQIndex, SSTableWriter, TERMINATED, TermInfo,
18};
19
20/// Statistics for merge operations
21#[derive(Debug, Clone, Default)]
22pub struct MergeStats {
23    /// Number of terms processed
24    pub terms_processed: usize,
25    /// Number of postings merged
26    pub postings_merged: usize,
27    /// Peak memory usage in bytes (estimated)
28    pub peak_memory_bytes: usize,
29    /// Current memory usage in bytes (estimated)
30    pub current_memory_bytes: usize,
31    /// Term dictionary output size
32    pub term_dict_bytes: usize,
33    /// Postings output size
34    pub postings_bytes: usize,
35    /// Store output size
36    pub store_bytes: usize,
37    /// Vector index output size
38    pub vectors_bytes: usize,
39    /// Sparse vector index output size
40    pub sparse_bytes: usize,
41}
42
43impl MergeStats {
44    /// Format memory as human-readable string
45    pub fn format_memory(bytes: usize) -> String {
46        if bytes >= 1024 * 1024 * 1024 {
47            format!("{:.2} GB", bytes as f64 / (1024.0 * 1024.0 * 1024.0))
48        } else if bytes >= 1024 * 1024 {
49            format!("{:.2} MB", bytes as f64 / (1024.0 * 1024.0))
50        } else if bytes >= 1024 {
51            format!("{:.2} KB", bytes as f64 / 1024.0)
52        } else {
53            format!("{} B", bytes)
54        }
55    }
56}
57
58/// Entry for k-way merge heap
59struct MergeEntry {
60    key: Vec<u8>,
61    term_info: TermInfo,
62    segment_idx: usize,
63    doc_offset: u32,
64}
65
66impl PartialEq for MergeEntry {
67    fn eq(&self, other: &Self) -> bool {
68        self.key == other.key
69    }
70}
71
72impl Eq for MergeEntry {}
73
74impl PartialOrd for MergeEntry {
75    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
76        Some(self.cmp(other))
77    }
78}
79
80impl Ord for MergeEntry {
81    fn cmp(&self, other: &Self) -> Ordering {
82        // Reverse order for min-heap (BinaryHeap is max-heap by default)
83        other.key.cmp(&self.key)
84    }
85}
86
87/// Segment merger - merges multiple segments into one
88pub struct SegmentMerger {
89    schema: Arc<Schema>,
90}
91
92impl SegmentMerger {
93    pub fn new(schema: Arc<Schema>) -> Self {
94        Self { schema }
95    }
96
97    /// Merge segments - uses optimized store stacking when possible
98    pub async fn merge<D: Directory + DirectoryWriter>(
99        &self,
100        dir: &D,
101        segments: &[SegmentReader],
102        new_segment_id: SegmentId,
103    ) -> Result<SegmentMeta> {
104        let (meta, _stats) = self.merge_with_stats(dir, segments, new_segment_id).await?;
105        Ok(meta)
106    }
107
108    /// Merge segments with memory tracking - returns merge statistics
109    pub async fn merge_with_stats<D: Directory + DirectoryWriter>(
110        &self,
111        dir: &D,
112        segments: &[SegmentReader],
113        new_segment_id: SegmentId,
114    ) -> Result<(SegmentMeta, MergeStats)> {
115        // Check if we can use optimized store stacking (no dictionaries)
116        let can_stack_stores = segments.iter().all(|s| !s.store_has_dict());
117
118        // Check if any segment has positions - if so, use rebuild merge
119        // (positions require doc ID remapping which optimized merge doesn't handle)
120        let has_positions = self
121            .schema
122            .fields()
123            .any(|(_, entry)| entry.positions.is_some());
124
125        // Note: sparse vectors now use optimized block stacking merge (no rebuild needed)
126        if can_stack_stores && !has_positions {
127            self.merge_optimized_with_stats(dir, segments, new_segment_id)
128                .await
129        } else {
130            self.merge_rebuild_with_stats(dir, segments, new_segment_id)
131                .await
132        }
133    }
134
135    /// Optimized merge with stats tracking
136    async fn merge_optimized_with_stats<D: Directory + DirectoryWriter>(
137        &self,
138        dir: &D,
139        segments: &[SegmentReader],
140        new_segment_id: SegmentId,
141    ) -> Result<(SegmentMeta, MergeStats)> {
142        let mut stats = MergeStats::default();
143        let files = SegmentFiles::new(new_segment_id.0);
144
145        // Build merged term dictionary and postings
146        let mut term_dict_data = Vec::new();
147        let mut postings_data = Vec::new();
148        let terms_processed = self
149            .merge_postings_with_stats(
150                segments,
151                &mut term_dict_data,
152                &mut postings_data,
153                &mut stats,
154            )
155            .await?;
156        stats.terms_processed = terms_processed;
157        stats.term_dict_bytes = term_dict_data.len();
158        stats.postings_bytes = postings_data.len();
159
160        // Track peak memory (term dict + postings buffers)
161        let current_mem = term_dict_data.capacity() + postings_data.capacity();
162        stats.current_memory_bytes = current_mem;
163        stats.peak_memory_bytes = stats.peak_memory_bytes.max(current_mem);
164
165        // Stack store files without recompression
166        let mut store_data = Vec::new();
167        {
168            let mut store_merger = StoreMerger::new(&mut store_data);
169            for segment in segments {
170                let raw_blocks = segment.store_raw_blocks();
171                let data_slice = segment.store_data_slice();
172                store_merger.append_store(data_slice, &raw_blocks).await?;
173            }
174            store_merger.finish()?;
175        }
176        stats.store_bytes = store_data.len();
177
178        // Update peak memory
179        let current_mem =
180            term_dict_data.capacity() + postings_data.capacity() + store_data.capacity();
181        stats.peak_memory_bytes = stats.peak_memory_bytes.max(current_mem);
182
183        // Write files
184        dir.write(&files.term_dict, &term_dict_data).await?;
185        dir.write(&files.postings, &postings_data).await?;
186        dir.write(&files.store, &store_data).await?;
187
188        // Free memory after writing
189        drop(term_dict_data);
190        drop(postings_data);
191        drop(store_data);
192
193        // Merge dense vector indexes
194        let vectors_bytes = self
195            .merge_dense_vectors_with_stats(dir, segments, &files)
196            .await?;
197        stats.vectors_bytes = vectors_bytes;
198
199        // Merge sparse vector indexes (optimized block stacking)
200        let sparse_bytes = self
201            .merge_sparse_vectors_optimized(dir, segments, &files)
202            .await?;
203        stats.sparse_bytes = sparse_bytes;
204
205        // Merge field stats
206        let mut merged_field_stats: FxHashMap<u32, FieldStats> = FxHashMap::default();
207        for segment in segments {
208            for (&field_id, field_stats) in &segment.meta().field_stats {
209                let entry = merged_field_stats.entry(field_id).or_default();
210                entry.total_tokens += field_stats.total_tokens;
211                entry.doc_count += field_stats.doc_count;
212            }
213        }
214
215        let total_docs: u32 = segments.iter().map(|s| s.num_docs()).sum();
216        let meta = SegmentMeta {
217            id: new_segment_id.0,
218            num_docs: total_docs,
219            field_stats: merged_field_stats,
220        };
221
222        dir.write(&files.meta, &meta.serialize()?).await?;
223
224        log::info!(
225            "Merge complete: {} terms, output: term_dict={}, postings={}, store={}, vectors={}, sparse={}",
226            stats.terms_processed,
227            MergeStats::format_memory(stats.term_dict_bytes),
228            MergeStats::format_memory(stats.postings_bytes),
229            MergeStats::format_memory(stats.store_bytes),
230            MergeStats::format_memory(stats.vectors_bytes),
231            MergeStats::format_memory(stats.sparse_bytes),
232        );
233
234        Ok((meta, stats))
235    }
236
237    /// Fallback merge with stats tracking
238    async fn merge_rebuild_with_stats<D: Directory + DirectoryWriter>(
239        &self,
240        dir: &D,
241        segments: &[SegmentReader],
242        new_segment_id: SegmentId,
243    ) -> Result<(SegmentMeta, MergeStats)> {
244        let mut stats = MergeStats::default();
245
246        let mut builder =
247            SegmentBuilder::new((*self.schema).clone(), SegmentBuilderConfig::default())?;
248
249        for segment in segments {
250            for doc_id in 0..segment.num_docs() {
251                if let Some(doc) = segment.doc(doc_id).await? {
252                    builder.add_document(doc)?;
253                }
254
255                // Track memory periodically
256                if doc_id % 10000 == 0 {
257                    let builder_stats = builder.stats();
258                    stats.current_memory_bytes = builder_stats.estimated_memory_bytes;
259                    stats.peak_memory_bytes =
260                        stats.peak_memory_bytes.max(stats.current_memory_bytes);
261                }
262            }
263        }
264
265        let meta = builder.build(dir, new_segment_id).await?;
266        Ok((meta, stats))
267    }
268
269    /// Merge postings from multiple segments using streaming k-way merge
270    ///
271    /// This implementation uses a min-heap to merge terms from all segments
272    /// in sorted order without loading all terms into memory at once.
273    /// Memory usage is O(num_segments) instead of O(total_terms).
274    ///
275    /// Optimization: For terms that exist in only one segment, we copy the
276    /// posting data directly without decode/encode. Only terms that exist
277    /// in multiple segments need full merge.
278    ///
279    /// Returns the number of terms processed.
280    async fn merge_postings_with_stats(
281        &self,
282        segments: &[SegmentReader],
283        term_dict: &mut Vec<u8>,
284        postings_out: &mut Vec<u8>,
285        stats: &mut MergeStats,
286    ) -> Result<usize> {
287        // Calculate doc offsets for each segment
288        let mut doc_offsets = Vec::with_capacity(segments.len());
289        let mut offset = 0u32;
290        for segment in segments {
291            doc_offsets.push(offset);
292            offset += segment.num_docs();
293        }
294
295        // Create iterators for each segment's term dictionary
296        let mut iterators: Vec<_> = segments.iter().map(|s| s.term_dict_iter()).collect();
297
298        // Initialize min-heap with first entry from each segment
299        let mut heap: BinaryHeap<MergeEntry> = BinaryHeap::new();
300        for (seg_idx, iter) in iterators.iter_mut().enumerate() {
301            if let Some((key, term_info)) = iter.next().await.map_err(crate::Error::from)? {
302                heap.push(MergeEntry {
303                    key,
304                    term_info,
305                    segment_idx: seg_idx,
306                    doc_offset: doc_offsets[seg_idx],
307                });
308            }
309        }
310
311        // Buffer term results - needed because SSTableWriter can't be held across await points
312        // Memory is bounded by unique terms (typically much smaller than postings)
313        let mut term_results: Vec<(Vec<u8>, TermInfo)> = Vec::new();
314        let mut terms_processed = 0usize;
315
316        while !heap.is_empty() {
317            // Get the smallest key
318            let first = heap.pop().unwrap();
319            let current_key = first.key.clone();
320
321            // Collect all entries with the same key
322            let mut sources: Vec<(usize, TermInfo, u32)> =
323                vec![(first.segment_idx, first.term_info, first.doc_offset)];
324
325            // Advance the iterator that provided this entry
326            if let Some((key, term_info)) = iterators[first.segment_idx]
327                .next()
328                .await
329                .map_err(crate::Error::from)?
330            {
331                heap.push(MergeEntry {
332                    key,
333                    term_info,
334                    segment_idx: first.segment_idx,
335                    doc_offset: doc_offsets[first.segment_idx],
336                });
337            }
338
339            // Check if other segments have the same key
340            while let Some(entry) = heap.peek() {
341                if entry.key != current_key {
342                    break;
343                }
344                let entry = heap.pop().unwrap();
345                sources.push((entry.segment_idx, entry.term_info, entry.doc_offset));
346
347                // Advance this iterator too
348                if let Some((key, term_info)) = iterators[entry.segment_idx]
349                    .next()
350                    .await
351                    .map_err(crate::Error::from)?
352                {
353                    heap.push(MergeEntry {
354                        key,
355                        term_info,
356                        segment_idx: entry.segment_idx,
357                        doc_offset: doc_offsets[entry.segment_idx],
358                    });
359                }
360            }
361
362            // Process this term
363            let term_info = if sources.len() == 1 {
364                // Optimization: Term exists in only one segment - copy directly
365                let (seg_idx, source_info, seg_doc_offset) = &sources[0];
366                self.copy_term_posting(
367                    &segments[*seg_idx],
368                    source_info,
369                    *seg_doc_offset,
370                    postings_out,
371                )
372                .await?
373            } else {
374                // Term exists in multiple segments - need full merge
375                self.merge_term_postings(segments, &sources, postings_out)
376                    .await?
377            };
378
379            term_results.push((current_key, term_info));
380            terms_processed += 1;
381
382            // Log progress every 100k terms
383            if terms_processed.is_multiple_of(100_000) {
384                log::debug!("Merge progress: {} terms processed", terms_processed);
385            }
386        }
387
388        // Track memory
389        let results_mem = term_results.capacity() * std::mem::size_of::<(Vec<u8>, TermInfo)>();
390        stats.current_memory_bytes = results_mem + postings_out.capacity();
391        stats.peak_memory_bytes = stats.peak_memory_bytes.max(stats.current_memory_bytes);
392
393        log::info!(
394            "[merge] complete: terms={}, segments={}, term_buffer={:.2} MB, postings={:.2} MB, peak={:.2} MB",
395            terms_processed,
396            segments.len(),
397            results_mem as f64 / (1024.0 * 1024.0),
398            postings_out.capacity() as f64 / (1024.0 * 1024.0),
399            stats.peak_memory_bytes as f64 / (1024.0 * 1024.0)
400        );
401
402        // Write to SSTable (sync, no await points)
403        let mut writer = SSTableWriter::<TermInfo>::new(term_dict);
404        for (key, term_info) in term_results {
405            writer.insert(&key, &term_info)?;
406        }
407        writer.finish()?;
408
409        Ok(terms_processed)
410    }
411
412    /// Copy a term's posting data directly from source segment (no decode/encode)
413    /// Only adjusts doc IDs by adding the segment's doc offset
414    async fn copy_term_posting(
415        &self,
416        segment: &SegmentReader,
417        source_info: &TermInfo,
418        doc_offset: u32,
419        postings_out: &mut Vec<u8>,
420    ) -> Result<TermInfo> {
421        // Handle inline postings - need to remap doc IDs
422        if let Some((doc_ids, term_freqs)) = source_info.decode_inline() {
423            let remapped_ids: Vec<u32> = doc_ids.iter().map(|&id| id + doc_offset).collect();
424            if let Some(inline) = TermInfo::try_inline(&remapped_ids, &term_freqs) {
425                return Ok(inline);
426            }
427            // If can't inline after remapping (shouldn't happen), fall through to external
428            let mut pl = PostingList::with_capacity(remapped_ids.len());
429            for (doc_id, tf) in remapped_ids.into_iter().zip(term_freqs.into_iter()) {
430                pl.push(doc_id, tf);
431            }
432            let posting_offset = postings_out.len() as u64;
433            let block_list = BlockPostingList::from_posting_list(&pl)?;
434            let mut encoded = Vec::new();
435            block_list.serialize(&mut encoded)?;
436            postings_out.extend_from_slice(&encoded);
437            return Ok(TermInfo::external(
438                posting_offset,
439                encoded.len() as u32,
440                pl.doc_count(),
441            ));
442        }
443
444        // External posting - read, decode, remap doc IDs, re-encode
445        // Note: We can't just copy bytes because doc IDs are delta-encoded
446        let (offset, len) = source_info.external_info().unwrap();
447        let posting_bytes = segment.read_postings(offset, len).await?;
448        let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
449
450        // Remap doc IDs
451        let mut remapped = PostingList::with_capacity(source_postings.doc_count() as usize);
452        let mut iter = source_postings.iterator();
453        while iter.doc() != TERMINATED {
454            remapped.add(iter.doc() + doc_offset, iter.term_freq());
455            iter.advance();
456        }
457
458        // Try to inline if small enough
459        let doc_ids: Vec<u32> = remapped.iter().map(|p| p.doc_id).collect();
460        let term_freqs: Vec<u32> = remapped.iter().map(|p| p.term_freq).collect();
461
462        if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
463            return Ok(inline);
464        }
465
466        // Write to postings file
467        let posting_offset = postings_out.len() as u64;
468        let block_list = BlockPostingList::from_posting_list(&remapped)?;
469        let mut encoded = Vec::new();
470        block_list.serialize(&mut encoded)?;
471        postings_out.extend_from_slice(&encoded);
472
473        Ok(TermInfo::external(
474            posting_offset,
475            encoded.len() as u32,
476            remapped.doc_count(),
477        ))
478    }
479
480    /// Merge postings for a term that exists in multiple segments
481    /// Uses block-level concatenation for O(num_blocks) instead of O(num_postings)
482    async fn merge_term_postings(
483        &self,
484        segments: &[SegmentReader],
485        sources: &[(usize, TermInfo, u32)],
486        postings_out: &mut Vec<u8>,
487    ) -> Result<TermInfo> {
488        // Sort sources by doc_offset to ensure postings are added in sorted order
489        let mut sorted_sources: Vec<_> = sources.to_vec();
490        sorted_sources.sort_by_key(|(_, _, doc_offset)| *doc_offset);
491
492        // Check if all sources are external (can use block concatenation)
493        let all_external = sorted_sources
494            .iter()
495            .all(|(_, term_info, _)| term_info.external_info().is_some());
496
497        if all_external && sorted_sources.len() > 1 {
498            // Fast path: block-level concatenation
499            let mut block_sources = Vec::with_capacity(sorted_sources.len());
500
501            for (seg_idx, term_info, doc_offset) in &sorted_sources {
502                let segment = &segments[*seg_idx];
503                let (offset, len) = term_info.external_info().unwrap();
504                let posting_bytes = segment.read_postings(offset, len).await?;
505                let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
506                block_sources.push((source_postings, *doc_offset));
507            }
508
509            let merged_blocks = BlockPostingList::concatenate_blocks(&block_sources)?;
510            let posting_offset = postings_out.len() as u64;
511            let mut encoded = Vec::new();
512            merged_blocks.serialize(&mut encoded)?;
513            postings_out.extend_from_slice(&encoded);
514
515            return Ok(TermInfo::external(
516                posting_offset,
517                encoded.len() as u32,
518                merged_blocks.doc_count(),
519            ));
520        }
521
522        // Slow path: full decode/encode for inline postings or single source
523        let mut merged = PostingList::new();
524
525        for (seg_idx, term_info, doc_offset) in &sorted_sources {
526            let segment = &segments[*seg_idx];
527
528            if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
529                // Inline posting list
530                for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
531                    merged.add(doc_id + doc_offset, tf);
532                }
533            } else {
534                // External posting list
535                let (offset, len) = term_info.external_info().unwrap();
536                let posting_bytes = segment.read_postings(offset, len).await?;
537                let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
538
539                let mut iter = source_postings.iterator();
540                while iter.doc() != TERMINATED {
541                    merged.add(iter.doc() + doc_offset, iter.term_freq());
542                    iter.advance();
543                }
544            }
545        }
546
547        // Try to inline small posting lists
548        let doc_ids: Vec<u32> = merged.iter().map(|p| p.doc_id).collect();
549        let term_freqs: Vec<u32> = merged.iter().map(|p| p.term_freq).collect();
550
551        if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
552            return Ok(inline);
553        }
554
555        // Write to postings file
556        let posting_offset = postings_out.len() as u64;
557        let block_list = BlockPostingList::from_posting_list(&merged)?;
558        let mut encoded = Vec::new();
559        block_list.serialize(&mut encoded)?;
560        postings_out.extend_from_slice(&encoded);
561
562        Ok(TermInfo::external(
563            posting_offset,
564            encoded.len() as u32,
565            merged.doc_count(),
566        ))
567    }
568    /// Merge dense vector indexes with stats tracking - returns output size in bytes
569    ///
570    /// For ScaNN (IVF-PQ): O(1) merge by concatenating cluster data (same codebook)
571    /// For IVF-RaBitQ: O(1) merge by concatenating cluster data (same centroids)
572    /// For RaBitQ: Must rebuild with new centroid from all vectors
573    async fn merge_dense_vectors_with_stats<D: Directory + DirectoryWriter>(
574        &self,
575        dir: &D,
576        segments: &[SegmentReader],
577        files: &SegmentFiles,
578    ) -> Result<usize> {
579        use byteorder::{LittleEndian, WriteBytesExt};
580
581        // (field_id, index_type, data)
582        let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
583
584        for (field, entry) in self.schema.fields() {
585            if !matches!(entry.field_type, FieldType::DenseVector) {
586                continue;
587            }
588
589            // Check if all segments have ScaNN indexes for this field
590            let scann_indexes: Vec<_> = segments
591                .iter()
592                .filter_map(|s| s.get_scann_vector_index(field))
593                .collect();
594
595            if scann_indexes.len()
596                == segments
597                    .iter()
598                    .filter(|s| s.has_dense_vector_index(field))
599                    .count()
600                && !scann_indexes.is_empty()
601            {
602                // All segments have ScaNN - use O(1) cluster merge!
603                let refs: Vec<&crate::structures::IVFPQIndex> =
604                    scann_indexes.iter().map(|(idx, _)| idx.as_ref()).collect();
605
606                // Calculate doc_id offsets
607                let mut doc_offsets = Vec::with_capacity(segments.len());
608                let mut offset = 0u32;
609                for segment in segments {
610                    doc_offsets.push(offset);
611                    offset += segment.num_docs();
612                }
613
614                match crate::structures::IVFPQIndex::merge(&refs, &doc_offsets) {
615                    Ok(merged) => {
616                        let bytes = merged
617                            .to_bytes()
618                            .map_err(|e| crate::Error::Serialization(e.to_string()))?;
619                        field_indexes.push((field.0, 2u8, bytes)); // 2 = ScaNN
620                        continue;
621                    }
622                    Err(e) => {
623                        log::warn!("ScaNN merge failed: {}, falling back to IVF", e);
624                    }
625                }
626            }
627
628            // Check if all segments have IVF indexes for this field
629            let ivf_indexes: Vec<_> = segments
630                .iter()
631                .filter_map(|s| s.get_ivf_vector_index(field))
632                .collect();
633
634            if ivf_indexes.len()
635                == segments
636                    .iter()
637                    .filter(|s| s.has_dense_vector_index(field))
638                    .count()
639                && !ivf_indexes.is_empty()
640            {
641                // All segments have IVF - use O(1) cluster merge!
642                let refs: Vec<&crate::structures::IVFRaBitQIndex> =
643                    ivf_indexes.iter().map(|arc| arc.as_ref()).collect();
644
645                // Calculate doc_id offsets
646                let mut doc_offsets = Vec::with_capacity(segments.len());
647                let mut offset = 0u32;
648                for segment in segments {
649                    doc_offsets.push(offset);
650                    offset += segment.num_docs();
651                }
652
653                match crate::structures::IVFRaBitQIndex::merge(&refs, &doc_offsets) {
654                    Ok(merged) => {
655                        let bytes = merged
656                            .to_bytes()
657                            .map_err(|e| crate::Error::Serialization(e.to_string()))?;
658                        field_indexes.push((field.0, 1u8, bytes)); // 1 = IVF-RaBitQ
659                        continue;
660                    }
661                    Err(e) => {
662                        log::warn!("IVF merge failed: {}, falling back to rebuild", e);
663                    }
664                }
665            }
666
667            // Fall back to RaBitQ rebuild (collect raw vectors)
668            let mut all_vectors: Vec<Vec<f32>> = Vec::new();
669
670            for segment in segments {
671                if let Some(index) = segment.get_dense_vector_index(field)
672                    && let Some(raw_vecs) = &index.raw_vectors
673                {
674                    all_vectors.extend(raw_vecs.iter().cloned());
675                }
676            }
677
678            if !all_vectors.is_empty() {
679                let dim = all_vectors[0].len();
680                let config = RaBitQConfig::new(dim);
681                let merged_index = RaBitQIndex::build(config, &all_vectors, true);
682
683                let index_bytes = serde_json::to_vec(&merged_index)
684                    .map_err(|e| crate::Error::Serialization(e.to_string()))?;
685
686                field_indexes.push((field.0, 0u8, index_bytes)); // 0 = RaBitQ
687            }
688        }
689
690        // Write unified vectors file with index_type
691        if !field_indexes.is_empty() {
692            field_indexes.sort_by_key(|(id, _, _)| *id);
693
694            // Header: num_fields + (field_id, index_type, offset, len) per field
695            let header_size = 4 + field_indexes.len() * (4 + 1 + 8 + 8);
696            let mut output = Vec::new();
697
698            output.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
699
700            let mut current_offset = header_size as u64;
701            for (field_id, index_type, data) in &field_indexes {
702                output.write_u32::<LittleEndian>(*field_id)?;
703                output.write_u8(*index_type)?;
704                output.write_u64::<LittleEndian>(current_offset)?;
705                output.write_u64::<LittleEndian>(data.len() as u64)?;
706                current_offset += data.len() as u64;
707            }
708
709            for (_, _, data) in field_indexes {
710                output.extend_from_slice(&data);
711            }
712
713            let output_size = output.len();
714            dir.write(&files.vectors, &output).await?;
715            return Ok(output_size);
716        }
717
718        Ok(0)
719    }
720
721    /// Merge sparse vector indexes using optimized block stacking
722    ///
723    /// This is O(blocks) instead of O(postings) - we stack blocks directly
724    /// and only adjust the first_doc_id in each block header by the doc offset.
725    /// Deltas within blocks remain unchanged since they're relative.
726    async fn merge_sparse_vectors_optimized<D: Directory + DirectoryWriter>(
727        &self,
728        dir: &D,
729        segments: &[SegmentReader],
730        files: &SegmentFiles,
731    ) -> Result<usize> {
732        use crate::structures::BlockSparsePostingList;
733        use byteorder::{LittleEndian, WriteBytesExt};
734
735        // Calculate doc offsets for each segment
736        let mut doc_offsets = Vec::with_capacity(segments.len());
737        let mut offset = 0u32;
738        for (i, segment) in segments.iter().enumerate() {
739            log::debug!(
740                "Sparse merge: segment {} has {} docs, doc_offset={}",
741                i,
742                segment.num_docs(),
743                offset
744            );
745            doc_offsets.push(offset);
746            offset += segment.num_docs();
747        }
748
749        // Collect all sparse vector fields from schema
750        let sparse_fields: Vec<_> = self
751            .schema
752            .fields()
753            .filter(|(_, entry)| matches!(entry.field_type, FieldType::SparseVector))
754            .map(|(field, entry)| (field, entry.sparse_vector_config.clone()))
755            .collect();
756
757        if sparse_fields.is_empty() {
758            return Ok(0);
759        }
760
761        // Collect field data: (field_id, quantization, max_dim_id, dim_id -> merged_posting_list)
762        type SparseFieldData = (
763            u32,
764            crate::structures::WeightQuantization,
765            u32,
766            FxHashMap<u32, Vec<u8>>,
767        );
768        let mut field_data: Vec<SparseFieldData> = Vec::new();
769
770        for (field, sparse_config) in &sparse_fields {
771            // Get quantization from config
772            let quantization = sparse_config
773                .as_ref()
774                .map(|c| c.weight_quantization)
775                .unwrap_or(crate::structures::WeightQuantization::Float32);
776
777            // Collect all dimensions across all segments for this field
778            let mut all_dims: rustc_hash::FxHashSet<u32> = rustc_hash::FxHashSet::default();
779            for segment in segments {
780                if let Some(sparse_index) = segment.sparse_indexes().get(&field.0) {
781                    for dim_id in sparse_index.active_dimensions() {
782                        all_dims.insert(dim_id);
783                    }
784                }
785            }
786
787            if all_dims.is_empty() {
788                continue;
789            }
790
791            let max_dim_id = all_dims.iter().max().copied().unwrap_or(0);
792            let mut dim_bytes: FxHashMap<u32, Vec<u8>> = FxHashMap::default();
793
794            // For each dimension, merge posting lists from all segments
795            for dim_id in all_dims {
796                // Collect posting lists for this dimension from all segments
797                // Keep Arcs alive while we borrow from them
798                let mut posting_arcs: Vec<(Arc<BlockSparsePostingList>, u32)> = Vec::new();
799
800                for (seg_idx, segment) in segments.iter().enumerate() {
801                    if let Some(sparse_index) = segment.sparse_indexes().get(&field.0)
802                        && let Ok(Some(posting_list)) = sparse_index.get_posting_blocking(dim_id)
803                    {
804                        log::trace!(
805                            "Sparse merge dim={}: seg={} offset={} doc_count={} blocks={}",
806                            dim_id,
807                            seg_idx,
808                            doc_offsets[seg_idx],
809                            posting_list.doc_count(),
810                            posting_list.blocks.len()
811                        );
812                        posting_arcs.push((posting_list, doc_offsets[seg_idx]));
813                    }
814                }
815
816                // Create references for merge
817                let lists_with_offsets: Vec<(&BlockSparsePostingList, u32)> = posting_arcs
818                    .iter()
819                    .map(|(pl, offset)| (pl.as_ref(), *offset))
820                    .collect();
821
822                if lists_with_offsets.is_empty() {
823                    continue;
824                }
825
826                // Merge using optimized block stacking
827                let merged = BlockSparsePostingList::merge_with_offsets(&lists_with_offsets);
828
829                log::trace!(
830                    "Sparse merge dim={}: merged {} lists -> doc_count={} blocks={}",
831                    dim_id,
832                    lists_with_offsets.len(),
833                    merged.doc_count(),
834                    merged.blocks.len()
835                );
836
837                // Serialize
838                let mut bytes = Vec::new();
839                merged.serialize(&mut bytes).map_err(crate::Error::Io)?;
840                dim_bytes.insert(dim_id, bytes);
841            }
842
843            field_data.push((field.0, quantization, max_dim_id + 1, dim_bytes));
844        }
845
846        if field_data.is_empty() {
847            return Ok(0);
848        }
849
850        // Sort by field_id
851        field_data.sort_by_key(|(id, _, _, _)| *id);
852
853        // Build sparse file (same format as builder)
854        // Header: num_fields (4)
855        // Per field: field_id (4) + quant (1) + max_dim_id (4) + table (12 * max_dim_id)
856        let mut header_size = 4u64;
857        for (_, _, max_dim_id, _) in &field_data {
858            header_size += 4 + 1 + 4; // field_id + quant + max_dim_id
859            header_size += (*max_dim_id as u64) * 12; // table entries
860        }
861
862        let mut output = Vec::new();
863        output.write_u32::<LittleEndian>(field_data.len() as u32)?;
864
865        let mut current_offset = header_size;
866        let mut all_data: Vec<u8> = Vec::new();
867        let mut field_tables: Vec<Vec<(u64, u32)>> = Vec::new();
868
869        for (_, _, max_dim_id, dim_bytes) in &field_data {
870            let mut table: Vec<(u64, u32)> = vec![(0, 0); *max_dim_id as usize];
871
872            for dim_id in 0..*max_dim_id {
873                if let Some(bytes) = dim_bytes.get(&dim_id) {
874                    table[dim_id as usize] = (current_offset, bytes.len() as u32);
875                    current_offset += bytes.len() as u64;
876                    all_data.extend_from_slice(bytes);
877                }
878            }
879            field_tables.push(table);
880        }
881
882        // Write field headers and tables
883        for (i, (field_id, quantization, max_dim_id, _)) in field_data.iter().enumerate() {
884            output.write_u32::<LittleEndian>(*field_id)?;
885            output.write_u8(*quantization as u8)?;
886            output.write_u32::<LittleEndian>(*max_dim_id)?;
887
888            for &(offset, length) in &field_tables[i] {
889                output.write_u64::<LittleEndian>(offset)?;
890                output.write_u32::<LittleEndian>(length)?;
891            }
892        }
893
894        // Append all data
895        output.extend_from_slice(&all_data);
896
897        let output_size = output.len();
898        dir.write(&files.sparse, &output).await?;
899
900        log::info!(
901            "Sparse vector merge complete: {} fields, {} bytes",
902            field_data.len(),
903            output_size
904        );
905
906        Ok(output_size)
907    }
908}
909
910/// Trained vector index structures for rebuilding segments with ANN indexes
911pub struct TrainedVectorStructures {
912    /// Trained centroids per field_id
913    pub centroids: rustc_hash::FxHashMap<u32, Arc<crate::structures::CoarseCentroids>>,
914    /// Trained PQ codebooks per field_id (for ScaNN)
915    pub codebooks: rustc_hash::FxHashMap<u32, Arc<crate::structures::PQCodebook>>,
916}
917
918impl SegmentMerger {
919    /// Merge segments and rebuild dense vectors with ANN indexes using trained structures
920    ///
921    /// This is called after centroids/codebooks are trained at index level.
922    /// It collects Flat vectors from all segments and builds IVF-RaBitQ or ScaNN indexes.
923    pub async fn merge_with_ann<D: Directory + DirectoryWriter>(
924        &self,
925        dir: &D,
926        segments: &[SegmentReader],
927        new_segment_id: SegmentId,
928        trained: &TrainedVectorStructures,
929    ) -> Result<SegmentMeta> {
930        let files = SegmentFiles::new(new_segment_id.0);
931
932        // Build merged term dictionary and postings
933        let mut term_dict_data = Vec::new();
934        let mut postings_data = Vec::new();
935        let mut stats = MergeStats::default();
936        self.merge_postings_with_stats(
937            segments,
938            &mut term_dict_data,
939            &mut postings_data,
940            &mut stats,
941        )
942        .await?;
943
944        // Stack store files
945        let mut store_data = Vec::new();
946        {
947            let mut store_merger = StoreMerger::new(&mut store_data);
948            for segment in segments {
949                let raw_blocks = segment.store_raw_blocks();
950                let data_slice = segment.store_data_slice();
951                store_merger.append_store(data_slice, &raw_blocks).await?;
952            }
953            store_merger.finish()?;
954        }
955
956        // Write text index files
957        dir.write(&files.term_dict, &term_dict_data).await?;
958        dir.write(&files.postings, &postings_data).await?;
959        dir.write(&files.store, &store_data).await?;
960
961        drop(term_dict_data);
962        drop(postings_data);
963        drop(store_data);
964
965        // Build ANN indexes using trained centroids
966        let vectors_bytes = self
967            .build_ann_vectors(dir, segments, &files, trained)
968            .await?;
969
970        // Merge field stats
971        let mut merged_field_stats: rustc_hash::FxHashMap<u32, FieldStats> =
972            rustc_hash::FxHashMap::default();
973        for segment in segments {
974            for (&field_id, field_stats) in &segment.meta().field_stats {
975                let entry = merged_field_stats.entry(field_id).or_default();
976                entry.total_tokens += field_stats.total_tokens;
977                entry.doc_count += field_stats.doc_count;
978            }
979        }
980
981        let total_docs: u32 = segments.iter().map(|s| s.num_docs()).sum();
982        let meta = SegmentMeta {
983            id: new_segment_id.0,
984            num_docs: total_docs,
985            field_stats: merged_field_stats,
986        };
987
988        dir.write(&files.meta, &meta.serialize()?).await?;
989
990        log::info!(
991            "ANN merge complete: {} docs, vectors={}",
992            total_docs,
993            MergeStats::format_memory(vectors_bytes)
994        );
995
996        Ok(meta)
997    }
998
999    /// Build ANN indexes from Flat vectors using trained centroids
1000    async fn build_ann_vectors<D: Directory + DirectoryWriter>(
1001        &self,
1002        dir: &D,
1003        segments: &[SegmentReader],
1004        files: &SegmentFiles,
1005        trained: &TrainedVectorStructures,
1006    ) -> Result<usize> {
1007        use crate::dsl::VectorIndexType;
1008        use byteorder::{LittleEndian, WriteBytesExt};
1009
1010        let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
1011
1012        for (field, entry) in self.schema.fields() {
1013            if !matches!(entry.field_type, FieldType::DenseVector) || !entry.indexed {
1014                continue;
1015            }
1016
1017            let config = match &entry.dense_vector_config {
1018                Some(c) => c,
1019                None => continue,
1020            };
1021
1022            // Collect all Flat vectors from segments
1023            let mut all_vectors: Vec<Vec<f32>> = Vec::new();
1024            let mut all_doc_ids: Vec<(u32, u16)> = Vec::new();
1025            let mut doc_offset = 0u32;
1026
1027            for segment in segments {
1028                if let Some(super::VectorIndex::Flat(flat_data)) =
1029                    segment.vector_indexes().get(&field.0)
1030                {
1031                    for (vec, (local_doc_id, ordinal)) in
1032                        flat_data.vectors.iter().zip(flat_data.doc_ids.iter())
1033                    {
1034                        all_vectors.push(vec.clone());
1035                        all_doc_ids.push((doc_offset + local_doc_id, *ordinal));
1036                    }
1037                }
1038                doc_offset += segment.num_docs();
1039            }
1040
1041            if all_vectors.is_empty() {
1042                continue;
1043            }
1044
1045            let dim = config.index_dim();
1046
1047            // Extract just doc_ids for ANN indexes (they don't track ordinals yet)
1048            let ann_doc_ids: Vec<u32> = all_doc_ids.iter().map(|(doc_id, _)| *doc_id).collect();
1049
1050            // Build ANN index based on index type and available trained structures
1051            match config.index_type {
1052                VectorIndexType::IvfRaBitQ => {
1053                    if let Some(centroids) = trained.centroids.get(&field.0) {
1054                        // Create RaBitQ codebook for the dimension
1055                        let rabitq_config = crate::structures::RaBitQConfig::new(dim);
1056                        let codebook = crate::structures::RaBitQCodebook::new(rabitq_config);
1057
1058                        // Build IVF-RaBitQ index
1059                        let ivf_config = crate::structures::IVFRaBitQConfig::new(dim)
1060                            .with_store_raw(config.store_raw);
1061                        let ivf_index = crate::structures::IVFRaBitQIndex::build(
1062                            ivf_config,
1063                            centroids,
1064                            &codebook,
1065                            &all_vectors,
1066                            Some(&ann_doc_ids),
1067                        );
1068
1069                        let index_data = super::builder::IVFRaBitQIndexData {
1070                            centroids: (**centroids).clone(),
1071                            codebook,
1072                            index: ivf_index,
1073                        };
1074                        let bytes = index_data
1075                            .to_bytes()
1076                            .map_err(|e| crate::Error::Serialization(e.to_string()))?;
1077                        field_indexes.push((field.0, 1u8, bytes)); // 1 = IVF-RaBitQ
1078
1079                        log::info!(
1080                            "Built IVF-RaBitQ index for field {} with {} vectors",
1081                            field.0,
1082                            all_vectors.len()
1083                        );
1084                        continue;
1085                    }
1086                }
1087                VectorIndexType::ScaNN => {
1088                    if let (Some(centroids), Some(codebook)) = (
1089                        trained.centroids.get(&field.0),
1090                        trained.codebooks.get(&field.0),
1091                    ) {
1092                        // Build ScaNN (IVF-PQ) index
1093                        let ivf_pq_config = crate::structures::IVFPQConfig::new(dim);
1094                        let ivf_pq_index = crate::structures::IVFPQIndex::build(
1095                            ivf_pq_config,
1096                            centroids,
1097                            codebook,
1098                            &all_vectors,
1099                            Some(&ann_doc_ids),
1100                        );
1101
1102                        let index_data = super::builder::ScaNNIndexData {
1103                            centroids: (**centroids).clone(),
1104                            codebook: (**codebook).clone(),
1105                            index: ivf_pq_index,
1106                        };
1107                        let bytes = index_data
1108                            .to_bytes()
1109                            .map_err(|e| crate::Error::Serialization(e.to_string()))?;
1110                        field_indexes.push((field.0, 2u8, bytes)); // 2 = ScaNN
1111
1112                        log::info!(
1113                            "Built ScaNN index for field {} with {} vectors",
1114                            field.0,
1115                            all_vectors.len()
1116                        );
1117                        continue;
1118                    }
1119                }
1120                _ => {}
1121            }
1122
1123            // Fallback: keep as Flat if no trained structures available
1124            let flat_data = super::builder::FlatVectorData {
1125                dim,
1126                vectors: all_vectors,
1127                doc_ids: all_doc_ids,
1128            };
1129            let bytes = serde_json::to_vec(&flat_data)
1130                .map_err(|e| crate::Error::Serialization(e.to_string()))?;
1131            field_indexes.push((field.0, 3u8, bytes)); // 3 = Flat
1132        }
1133
1134        // Write vectors file
1135        if !field_indexes.is_empty() {
1136            field_indexes.sort_by_key(|(id, _, _)| *id);
1137
1138            let header_size = 4 + field_indexes.len() * (4 + 1 + 8 + 8);
1139            let mut output = Vec::new();
1140
1141            output.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
1142
1143            let mut current_offset = header_size as u64;
1144            for (field_id, index_type, data) in &field_indexes {
1145                output.write_u32::<LittleEndian>(*field_id)?;
1146                output.write_u8(*index_type)?;
1147                output.write_u64::<LittleEndian>(current_offset)?;
1148                output.write_u64::<LittleEndian>(data.len() as u64)?;
1149                current_offset += data.len() as u64;
1150            }
1151
1152            for (_, _, data) in field_indexes {
1153                output.extend_from_slice(&data);
1154            }
1155
1156            let output_size = output.len();
1157            dir.write(&files.vectors, &output).await?;
1158            return Ok(output_size);
1159        }
1160
1161        Ok(0)
1162    }
1163}
1164
1165/// Delete segment files from directory
1166pub async fn delete_segment<D: Directory + DirectoryWriter>(
1167    dir: &D,
1168    segment_id: SegmentId,
1169) -> Result<()> {
1170    let files = SegmentFiles::new(segment_id.0);
1171    let _ = dir.delete(&files.term_dict).await;
1172    let _ = dir.delete(&files.postings).await;
1173    let _ = dir.delete(&files.store).await;
1174    let _ = dir.delete(&files.meta).await;
1175    let _ = dir.delete(&files.vectors).await;
1176    Ok(())
1177}