Skip to main content

hermes_core/segment/
merger.rs

1//! Segment merger for combining multiple segments
2
3use std::cmp::Ordering;
4use std::collections::BinaryHeap;
5use std::sync::Arc;
6
7use rustc_hash::FxHashMap;
8
9use super::builder::{SegmentBuilder, SegmentBuilderConfig};
10use super::reader::SegmentReader;
11use super::store::StoreMerger;
12use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
13use crate::Result;
14use crate::directories::{Directory, DirectoryWriter};
15use crate::dsl::{FieldType, Schema};
16use crate::structures::{
17    BlockPostingList, PostingList, RaBitQConfig, RaBitQIndex, SSTableWriter, TERMINATED, TermInfo,
18};
19
20/// Statistics for merge operations
21#[derive(Debug, Clone, Default)]
22pub struct MergeStats {
23    /// Number of terms processed
24    pub terms_processed: usize,
25    /// Number of postings merged
26    pub postings_merged: usize,
27    /// Peak memory usage in bytes (estimated)
28    pub peak_memory_bytes: usize,
29    /// Current memory usage in bytes (estimated)
30    pub current_memory_bytes: usize,
31    /// Term dictionary output size
32    pub term_dict_bytes: usize,
33    /// Postings output size
34    pub postings_bytes: usize,
35    /// Store output size
36    pub store_bytes: usize,
37    /// Vector index output size
38    pub vectors_bytes: usize,
39    /// Sparse vector index output size
40    pub sparse_bytes: usize,
41}
42
43impl MergeStats {
44    /// Format memory as human-readable string
45    pub fn format_memory(bytes: usize) -> String {
46        if bytes >= 1024 * 1024 * 1024 {
47            format!("{:.2} GB", bytes as f64 / (1024.0 * 1024.0 * 1024.0))
48        } else if bytes >= 1024 * 1024 {
49            format!("{:.2} MB", bytes as f64 / (1024.0 * 1024.0))
50        } else if bytes >= 1024 {
51            format!("{:.2} KB", bytes as f64 / 1024.0)
52        } else {
53            format!("{} B", bytes)
54        }
55    }
56}
57
58/// Entry for k-way merge heap
59struct MergeEntry {
60    key: Vec<u8>,
61    term_info: TermInfo,
62    segment_idx: usize,
63    doc_offset: u32,
64}
65
66impl PartialEq for MergeEntry {
67    fn eq(&self, other: &Self) -> bool {
68        self.key == other.key
69    }
70}
71
72impl Eq for MergeEntry {}
73
74impl PartialOrd for MergeEntry {
75    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
76        Some(self.cmp(other))
77    }
78}
79
80impl Ord for MergeEntry {
81    fn cmp(&self, other: &Self) -> Ordering {
82        // Reverse order for min-heap (BinaryHeap is max-heap by default)
83        other.key.cmp(&self.key)
84    }
85}
86
87/// Segment merger - merges multiple segments into one
88pub struct SegmentMerger {
89    schema: Arc<Schema>,
90}
91
92impl SegmentMerger {
93    pub fn new(schema: Arc<Schema>) -> Self {
94        Self { schema }
95    }
96
97    /// Merge segments - uses optimized store stacking when possible
98    pub async fn merge<D: Directory + DirectoryWriter>(
99        &self,
100        dir: &D,
101        segments: &[SegmentReader],
102        new_segment_id: SegmentId,
103    ) -> Result<SegmentMeta> {
104        let (meta, _stats) = self.merge_with_stats(dir, segments, new_segment_id).await?;
105        Ok(meta)
106    }
107
108    /// Merge segments with memory tracking - returns merge statistics
109    pub async fn merge_with_stats<D: Directory + DirectoryWriter>(
110        &self,
111        dir: &D,
112        segments: &[SegmentReader],
113        new_segment_id: SegmentId,
114    ) -> Result<(SegmentMeta, MergeStats)> {
115        // Check if we can use optimized store stacking (no dictionaries)
116        let can_stack_stores = segments.iter().all(|s| !s.store_has_dict());
117
118        // Check if any segment has positions - if so, use rebuild merge
119        // (positions require doc ID remapping which optimized merge doesn't handle)
120        let has_positions = self
121            .schema
122            .fields()
123            .any(|(_, entry)| entry.positions.is_some());
124
125        // Note: sparse vectors now use optimized block stacking merge (no rebuild needed)
126        if can_stack_stores && !has_positions {
127            self.merge_optimized_with_stats(dir, segments, new_segment_id)
128                .await
129        } else {
130            self.merge_rebuild_with_stats(dir, segments, new_segment_id)
131                .await
132        }
133    }
134
135    /// Optimized merge with stats tracking
136    async fn merge_optimized_with_stats<D: Directory + DirectoryWriter>(
137        &self,
138        dir: &D,
139        segments: &[SegmentReader],
140        new_segment_id: SegmentId,
141    ) -> Result<(SegmentMeta, MergeStats)> {
142        let mut stats = MergeStats::default();
143        let files = SegmentFiles::new(new_segment_id.0);
144
145        // Build merged term dictionary and postings
146        let mut term_dict_data = Vec::new();
147        let mut postings_data = Vec::new();
148        let terms_processed = self
149            .merge_postings_with_stats(
150                segments,
151                &mut term_dict_data,
152                &mut postings_data,
153                &mut stats,
154            )
155            .await?;
156        stats.terms_processed = terms_processed;
157        stats.term_dict_bytes = term_dict_data.len();
158        stats.postings_bytes = postings_data.len();
159
160        // Track peak memory (term dict + postings buffers)
161        let current_mem = term_dict_data.capacity() + postings_data.capacity();
162        stats.current_memory_bytes = current_mem;
163        stats.peak_memory_bytes = stats.peak_memory_bytes.max(current_mem);
164
165        // Stack store files without recompression
166        let mut store_data = Vec::new();
167        {
168            let mut store_merger = StoreMerger::new(&mut store_data);
169            for segment in segments {
170                let raw_blocks = segment.store_raw_blocks();
171                let data_slice = segment.store_data_slice();
172                store_merger.append_store(data_slice, &raw_blocks).await?;
173            }
174            store_merger.finish()?;
175        }
176        stats.store_bytes = store_data.len();
177
178        // Update peak memory
179        let current_mem =
180            term_dict_data.capacity() + postings_data.capacity() + store_data.capacity();
181        stats.peak_memory_bytes = stats.peak_memory_bytes.max(current_mem);
182
183        // Write files
184        dir.write(&files.term_dict, &term_dict_data).await?;
185        dir.write(&files.postings, &postings_data).await?;
186        dir.write(&files.store, &store_data).await?;
187
188        // Free memory after writing
189        drop(term_dict_data);
190        drop(postings_data);
191        drop(store_data);
192
193        // Merge dense vector indexes
194        let vectors_bytes = self
195            .merge_dense_vectors_with_stats(dir, segments, &files)
196            .await?;
197        stats.vectors_bytes = vectors_bytes;
198
199        // Merge sparse vector indexes (optimized block stacking)
200        let sparse_bytes = self
201            .merge_sparse_vectors_optimized(dir, segments, &files)
202            .await?;
203        stats.sparse_bytes = sparse_bytes;
204
205        // Merge field stats
206        let mut merged_field_stats: FxHashMap<u32, FieldStats> = FxHashMap::default();
207        for segment in segments {
208            for (&field_id, field_stats) in &segment.meta().field_stats {
209                let entry = merged_field_stats.entry(field_id).or_default();
210                entry.total_tokens += field_stats.total_tokens;
211                entry.doc_count += field_stats.doc_count;
212            }
213        }
214
215        let total_docs: u32 = segments.iter().map(|s| s.num_docs()).sum();
216        let meta = SegmentMeta {
217            id: new_segment_id.0,
218            num_docs: total_docs,
219            field_stats: merged_field_stats,
220        };
221
222        dir.write(&files.meta, &meta.serialize()?).await?;
223
224        log::info!(
225            "Merge complete: {} terms, output: term_dict={}, postings={}, store={}, vectors={}, sparse={}",
226            stats.terms_processed,
227            MergeStats::format_memory(stats.term_dict_bytes),
228            MergeStats::format_memory(stats.postings_bytes),
229            MergeStats::format_memory(stats.store_bytes),
230            MergeStats::format_memory(stats.vectors_bytes),
231            MergeStats::format_memory(stats.sparse_bytes),
232        );
233
234        Ok((meta, stats))
235    }
236
237    /// Fallback merge with stats tracking
238    async fn merge_rebuild_with_stats<D: Directory + DirectoryWriter>(
239        &self,
240        dir: &D,
241        segments: &[SegmentReader],
242        new_segment_id: SegmentId,
243    ) -> Result<(SegmentMeta, MergeStats)> {
244        let mut stats = MergeStats::default();
245
246        let mut builder =
247            SegmentBuilder::new((*self.schema).clone(), SegmentBuilderConfig::default())?;
248
249        for segment in segments {
250            for doc_id in 0..segment.num_docs() {
251                if let Some(doc) = segment.doc(doc_id).await? {
252                    builder.add_document(doc)?;
253                }
254
255                // Track memory periodically
256                if doc_id % 10000 == 0 {
257                    let builder_stats = builder.stats();
258                    stats.current_memory_bytes = builder_stats.estimated_memory_bytes;
259                    stats.peak_memory_bytes =
260                        stats.peak_memory_bytes.max(stats.current_memory_bytes);
261                }
262            }
263        }
264
265        let meta = builder.build(dir, new_segment_id).await?;
266        Ok((meta, stats))
267    }
268
269    /// Merge postings from multiple segments using streaming k-way merge
270    ///
271    /// This implementation uses a min-heap to merge terms from all segments
272    /// in sorted order without loading all terms into memory at once.
273    /// Memory usage is O(num_segments) instead of O(total_terms).
274    ///
275    /// Optimization: For terms that exist in only one segment, we copy the
276    /// posting data directly without decode/encode. Only terms that exist
277    /// in multiple segments need full merge.
278    ///
279    /// Returns the number of terms processed.
280    async fn merge_postings_with_stats(
281        &self,
282        segments: &[SegmentReader],
283        term_dict: &mut Vec<u8>,
284        postings_out: &mut Vec<u8>,
285        stats: &mut MergeStats,
286    ) -> Result<usize> {
287        // Calculate doc offsets for each segment
288        let mut doc_offsets = Vec::with_capacity(segments.len());
289        let mut offset = 0u32;
290        for segment in segments {
291            doc_offsets.push(offset);
292            offset += segment.num_docs();
293        }
294
295        // Create iterators for each segment's term dictionary
296        let mut iterators: Vec<_> = segments.iter().map(|s| s.term_dict_iter()).collect();
297
298        // Initialize min-heap with first entry from each segment
299        let mut heap: BinaryHeap<MergeEntry> = BinaryHeap::new();
300        for (seg_idx, iter) in iterators.iter_mut().enumerate() {
301            if let Some((key, term_info)) = iter.next().await.map_err(crate::Error::from)? {
302                heap.push(MergeEntry {
303                    key,
304                    term_info,
305                    segment_idx: seg_idx,
306                    doc_offset: doc_offsets[seg_idx],
307                });
308            }
309        }
310
311        // Collect results for SSTable writing (need to buffer due to async)
312        let mut term_results: Vec<(Vec<u8>, TermInfo)> = Vec::new();
313        let mut terms_processed = 0usize;
314
315        while !heap.is_empty() {
316            // Get the smallest key
317            let first = heap.pop().unwrap();
318            let current_key = first.key.clone();
319
320            // Collect all entries with the same key
321            let mut sources: Vec<(usize, TermInfo, u32)> =
322                vec![(first.segment_idx, first.term_info, first.doc_offset)];
323
324            // Advance the iterator that provided this entry
325            if let Some((key, term_info)) = iterators[first.segment_idx]
326                .next()
327                .await
328                .map_err(crate::Error::from)?
329            {
330                heap.push(MergeEntry {
331                    key,
332                    term_info,
333                    segment_idx: first.segment_idx,
334                    doc_offset: doc_offsets[first.segment_idx],
335                });
336            }
337
338            // Check if other segments have the same key
339            while let Some(entry) = heap.peek() {
340                if entry.key != current_key {
341                    break;
342                }
343                let entry = heap.pop().unwrap();
344                sources.push((entry.segment_idx, entry.term_info, entry.doc_offset));
345
346                // Advance this iterator too
347                if let Some((key, term_info)) = iterators[entry.segment_idx]
348                    .next()
349                    .await
350                    .map_err(crate::Error::from)?
351                {
352                    heap.push(MergeEntry {
353                        key,
354                        term_info,
355                        segment_idx: entry.segment_idx,
356                        doc_offset: doc_offsets[entry.segment_idx],
357                    });
358                }
359            }
360
361            // Process this term
362            let term_info = if sources.len() == 1 {
363                // Optimization: Term exists in only one segment - copy directly
364                let (seg_idx, source_info, seg_doc_offset) = &sources[0];
365                self.copy_term_posting(
366                    &segments[*seg_idx],
367                    source_info,
368                    *seg_doc_offset,
369                    postings_out,
370                )
371                .await?
372            } else {
373                // Term exists in multiple segments - need full merge
374                self.merge_term_postings(segments, &sources, postings_out)
375                    .await?
376            };
377
378            term_results.push((current_key, term_info));
379            terms_processed += 1;
380
381            // Log progress every 100k terms
382            if terms_processed.is_multiple_of(100_000) {
383                log::debug!("Merge progress: {} terms processed", terms_processed);
384            }
385        }
386
387        log::info!(
388            "Merge complete: {} terms processed from {} segments",
389            terms_processed,
390            segments.len()
391        );
392
393        // Track memory for term_results buffer
394        let results_mem = term_results.capacity() * std::mem::size_of::<(Vec<u8>, TermInfo)>();
395        stats.current_memory_bytes = results_mem + postings_out.capacity();
396        stats.peak_memory_bytes = stats.peak_memory_bytes.max(stats.current_memory_bytes);
397
398        // Write to SSTable (sync, no await points)
399        let mut writer = SSTableWriter::<TermInfo>::new(term_dict);
400        for (key, term_info) in term_results {
401            writer.insert(&key, &term_info)?;
402        }
403        writer.finish()?;
404
405        Ok(terms_processed)
406    }
407
408    /// Copy a term's posting data directly from source segment (no decode/encode)
409    /// Only adjusts doc IDs by adding the segment's doc offset
410    async fn copy_term_posting(
411        &self,
412        segment: &SegmentReader,
413        source_info: &TermInfo,
414        doc_offset: u32,
415        postings_out: &mut Vec<u8>,
416    ) -> Result<TermInfo> {
417        // Handle inline postings - need to remap doc IDs
418        if let Some((doc_ids, term_freqs)) = source_info.decode_inline() {
419            let remapped_ids: Vec<u32> = doc_ids.iter().map(|&id| id + doc_offset).collect();
420            if let Some(inline) = TermInfo::try_inline(&remapped_ids, &term_freqs) {
421                return Ok(inline);
422            }
423            // If can't inline after remapping (shouldn't happen), fall through to external
424            let mut pl = PostingList::with_capacity(remapped_ids.len());
425            for (doc_id, tf) in remapped_ids.into_iter().zip(term_freqs.into_iter()) {
426                pl.push(doc_id, tf);
427            }
428            let posting_offset = postings_out.len() as u64;
429            let block_list = BlockPostingList::from_posting_list(&pl)?;
430            let mut encoded = Vec::new();
431            block_list.serialize(&mut encoded)?;
432            postings_out.extend_from_slice(&encoded);
433            return Ok(TermInfo::external(
434                posting_offset,
435                encoded.len() as u32,
436                pl.doc_count(),
437            ));
438        }
439
440        // External posting - read, decode, remap doc IDs, re-encode
441        // Note: We can't just copy bytes because doc IDs are delta-encoded
442        let (offset, len) = source_info.external_info().unwrap();
443        let posting_bytes = segment.read_postings(offset, len).await?;
444        let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
445
446        // Remap doc IDs
447        let mut remapped = PostingList::with_capacity(source_postings.doc_count() as usize);
448        let mut iter = source_postings.iterator();
449        while iter.doc() != TERMINATED {
450            remapped.add(iter.doc() + doc_offset, iter.term_freq());
451            iter.advance();
452        }
453
454        // Try to inline if small enough
455        let doc_ids: Vec<u32> = remapped.iter().map(|p| p.doc_id).collect();
456        let term_freqs: Vec<u32> = remapped.iter().map(|p| p.term_freq).collect();
457
458        if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
459            return Ok(inline);
460        }
461
462        // Write to postings file
463        let posting_offset = postings_out.len() as u64;
464        let block_list = BlockPostingList::from_posting_list(&remapped)?;
465        let mut encoded = Vec::new();
466        block_list.serialize(&mut encoded)?;
467        postings_out.extend_from_slice(&encoded);
468
469        Ok(TermInfo::external(
470            posting_offset,
471            encoded.len() as u32,
472            remapped.doc_count(),
473        ))
474    }
475
476    /// Merge postings for a term that exists in multiple segments
477    /// Uses block-level concatenation for O(num_blocks) instead of O(num_postings)
478    async fn merge_term_postings(
479        &self,
480        segments: &[SegmentReader],
481        sources: &[(usize, TermInfo, u32)],
482        postings_out: &mut Vec<u8>,
483    ) -> Result<TermInfo> {
484        // Sort sources by doc_offset to ensure postings are added in sorted order
485        let mut sorted_sources: Vec<_> = sources.to_vec();
486        sorted_sources.sort_by_key(|(_, _, doc_offset)| *doc_offset);
487
488        // Check if all sources are external (can use block concatenation)
489        let all_external = sorted_sources
490            .iter()
491            .all(|(_, term_info, _)| term_info.external_info().is_some());
492
493        if all_external && sorted_sources.len() > 1 {
494            // Fast path: block-level concatenation
495            let mut block_sources = Vec::with_capacity(sorted_sources.len());
496
497            for (seg_idx, term_info, doc_offset) in &sorted_sources {
498                let segment = &segments[*seg_idx];
499                let (offset, len) = term_info.external_info().unwrap();
500                let posting_bytes = segment.read_postings(offset, len).await?;
501                let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
502                block_sources.push((source_postings, *doc_offset));
503            }
504
505            let merged_blocks = BlockPostingList::concatenate_blocks(&block_sources)?;
506            let posting_offset = postings_out.len() as u64;
507            let mut encoded = Vec::new();
508            merged_blocks.serialize(&mut encoded)?;
509            postings_out.extend_from_slice(&encoded);
510
511            return Ok(TermInfo::external(
512                posting_offset,
513                encoded.len() as u32,
514                merged_blocks.doc_count(),
515            ));
516        }
517
518        // Slow path: full decode/encode for inline postings or single source
519        let mut merged = PostingList::new();
520
521        for (seg_idx, term_info, doc_offset) in &sorted_sources {
522            let segment = &segments[*seg_idx];
523
524            if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
525                // Inline posting list
526                for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
527                    merged.add(doc_id + doc_offset, tf);
528                }
529            } else {
530                // External posting list
531                let (offset, len) = term_info.external_info().unwrap();
532                let posting_bytes = segment.read_postings(offset, len).await?;
533                let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
534
535                let mut iter = source_postings.iterator();
536                while iter.doc() != TERMINATED {
537                    merged.add(iter.doc() + doc_offset, iter.term_freq());
538                    iter.advance();
539                }
540            }
541        }
542
543        // Try to inline small posting lists
544        let doc_ids: Vec<u32> = merged.iter().map(|p| p.doc_id).collect();
545        let term_freqs: Vec<u32> = merged.iter().map(|p| p.term_freq).collect();
546
547        if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
548            return Ok(inline);
549        }
550
551        // Write to postings file
552        let posting_offset = postings_out.len() as u64;
553        let block_list = BlockPostingList::from_posting_list(&merged)?;
554        let mut encoded = Vec::new();
555        block_list.serialize(&mut encoded)?;
556        postings_out.extend_from_slice(&encoded);
557
558        Ok(TermInfo::external(
559            posting_offset,
560            encoded.len() as u32,
561            merged.doc_count(),
562        ))
563    }
564    /// Merge dense vector indexes with stats tracking - returns output size in bytes
565    ///
566    /// For ScaNN (IVF-PQ): O(1) merge by concatenating cluster data (same codebook)
567    /// For IVF-RaBitQ: O(1) merge by concatenating cluster data (same centroids)
568    /// For RaBitQ: Must rebuild with new centroid from all vectors
569    async fn merge_dense_vectors_with_stats<D: Directory + DirectoryWriter>(
570        &self,
571        dir: &D,
572        segments: &[SegmentReader],
573        files: &SegmentFiles,
574    ) -> Result<usize> {
575        use byteorder::{LittleEndian, WriteBytesExt};
576
577        // (field_id, index_type, data)
578        let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
579
580        for (field, entry) in self.schema.fields() {
581            if !matches!(entry.field_type, FieldType::DenseVector) {
582                continue;
583            }
584
585            // Check if all segments have ScaNN indexes for this field
586            let scann_indexes: Vec<_> = segments
587                .iter()
588                .filter_map(|s| s.get_scann_vector_index(field))
589                .collect();
590
591            if scann_indexes.len()
592                == segments
593                    .iter()
594                    .filter(|s| s.has_dense_vector_index(field))
595                    .count()
596                && !scann_indexes.is_empty()
597            {
598                // All segments have ScaNN - use O(1) cluster merge!
599                let refs: Vec<&crate::structures::IVFPQIndex> =
600                    scann_indexes.iter().map(|(idx, _)| idx.as_ref()).collect();
601
602                // Calculate doc_id offsets
603                let mut doc_offsets = Vec::with_capacity(segments.len());
604                let mut offset = 0u32;
605                for segment in segments {
606                    doc_offsets.push(offset);
607                    offset += segment.num_docs();
608                }
609
610                match crate::structures::IVFPQIndex::merge(&refs, &doc_offsets) {
611                    Ok(merged) => {
612                        let bytes = merged
613                            .to_bytes()
614                            .map_err(|e| crate::Error::Serialization(e.to_string()))?;
615                        field_indexes.push((field.0, 2u8, bytes)); // 2 = ScaNN
616                        continue;
617                    }
618                    Err(e) => {
619                        log::warn!("ScaNN merge failed: {}, falling back to IVF", e);
620                    }
621                }
622            }
623
624            // Check if all segments have IVF indexes for this field
625            let ivf_indexes: Vec<_> = segments
626                .iter()
627                .filter_map(|s| s.get_ivf_vector_index(field))
628                .collect();
629
630            if ivf_indexes.len()
631                == segments
632                    .iter()
633                    .filter(|s| s.has_dense_vector_index(field))
634                    .count()
635                && !ivf_indexes.is_empty()
636            {
637                // All segments have IVF - use O(1) cluster merge!
638                let refs: Vec<&crate::structures::IVFRaBitQIndex> =
639                    ivf_indexes.iter().map(|arc| arc.as_ref()).collect();
640
641                // Calculate doc_id offsets
642                let mut doc_offsets = Vec::with_capacity(segments.len());
643                let mut offset = 0u32;
644                for segment in segments {
645                    doc_offsets.push(offset);
646                    offset += segment.num_docs();
647                }
648
649                match crate::structures::IVFRaBitQIndex::merge(&refs, &doc_offsets) {
650                    Ok(merged) => {
651                        let bytes = merged
652                            .to_bytes()
653                            .map_err(|e| crate::Error::Serialization(e.to_string()))?;
654                        field_indexes.push((field.0, 1u8, bytes)); // 1 = IVF-RaBitQ
655                        continue;
656                    }
657                    Err(e) => {
658                        log::warn!("IVF merge failed: {}, falling back to rebuild", e);
659                    }
660                }
661            }
662
663            // Fall back to RaBitQ rebuild (collect raw vectors)
664            let mut all_vectors: Vec<Vec<f32>> = Vec::new();
665
666            for segment in segments {
667                if let Some(index) = segment.get_dense_vector_index(field)
668                    && let Some(raw_vecs) = &index.raw_vectors
669                {
670                    all_vectors.extend(raw_vecs.iter().cloned());
671                }
672            }
673
674            if !all_vectors.is_empty() {
675                let dim = all_vectors[0].len();
676                let config = RaBitQConfig::new(dim);
677                let merged_index = RaBitQIndex::build(config, &all_vectors, true);
678
679                let index_bytes = serde_json::to_vec(&merged_index)
680                    .map_err(|e| crate::Error::Serialization(e.to_string()))?;
681
682                field_indexes.push((field.0, 0u8, index_bytes)); // 0 = RaBitQ
683            }
684        }
685
686        // Write unified vectors file with index_type
687        if !field_indexes.is_empty() {
688            field_indexes.sort_by_key(|(id, _, _)| *id);
689
690            // Header: num_fields + (field_id, index_type, offset, len) per field
691            let header_size = 4 + field_indexes.len() * (4 + 1 + 8 + 8);
692            let mut output = Vec::new();
693
694            output.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
695
696            let mut current_offset = header_size as u64;
697            for (field_id, index_type, data) in &field_indexes {
698                output.write_u32::<LittleEndian>(*field_id)?;
699                output.write_u8(*index_type)?;
700                output.write_u64::<LittleEndian>(current_offset)?;
701                output.write_u64::<LittleEndian>(data.len() as u64)?;
702                current_offset += data.len() as u64;
703            }
704
705            for (_, _, data) in field_indexes {
706                output.extend_from_slice(&data);
707            }
708
709            let output_size = output.len();
710            dir.write(&files.vectors, &output).await?;
711            return Ok(output_size);
712        }
713
714        Ok(0)
715    }
716
717    /// Merge sparse vector indexes using optimized block stacking
718    ///
719    /// This is O(blocks) instead of O(postings) - we stack blocks directly
720    /// and only adjust the first_doc_id in each block header by the doc offset.
721    /// Deltas within blocks remain unchanged since they're relative.
722    async fn merge_sparse_vectors_optimized<D: Directory + DirectoryWriter>(
723        &self,
724        dir: &D,
725        segments: &[SegmentReader],
726        files: &SegmentFiles,
727    ) -> Result<usize> {
728        use crate::structures::BlockSparsePostingList;
729        use byteorder::{LittleEndian, WriteBytesExt};
730
731        // Calculate doc offsets for each segment
732        let mut doc_offsets = Vec::with_capacity(segments.len());
733        let mut offset = 0u32;
734        for (i, segment) in segments.iter().enumerate() {
735            log::debug!(
736                "Sparse merge: segment {} has {} docs, doc_offset={}",
737                i,
738                segment.num_docs(),
739                offset
740            );
741            doc_offsets.push(offset);
742            offset += segment.num_docs();
743        }
744
745        // Collect all sparse vector fields from schema
746        let sparse_fields: Vec<_> = self
747            .schema
748            .fields()
749            .filter(|(_, entry)| matches!(entry.field_type, FieldType::SparseVector))
750            .map(|(field, entry)| (field, entry.sparse_vector_config.clone()))
751            .collect();
752
753        if sparse_fields.is_empty() {
754            return Ok(0);
755        }
756
757        // Collect field data: (field_id, quantization, max_dim_id, dim_id -> merged_posting_list)
758        type SparseFieldData = (
759            u32,
760            crate::structures::WeightQuantization,
761            u32,
762            FxHashMap<u32, Vec<u8>>,
763        );
764        let mut field_data: Vec<SparseFieldData> = Vec::new();
765
766        for (field, sparse_config) in &sparse_fields {
767            // Get quantization from config
768            let quantization = sparse_config
769                .as_ref()
770                .map(|c| c.weight_quantization)
771                .unwrap_or(crate::structures::WeightQuantization::Float32);
772
773            // Collect all dimensions across all segments for this field
774            let mut all_dims: rustc_hash::FxHashSet<u32> = rustc_hash::FxHashSet::default();
775            for segment in segments {
776                if let Some(sparse_index) = segment.sparse_indexes().get(&field.0) {
777                    for (dim_id, posting) in sparse_index.postings.iter().enumerate() {
778                        if posting.is_some() {
779                            all_dims.insert(dim_id as u32);
780                        }
781                    }
782                }
783            }
784
785            if all_dims.is_empty() {
786                continue;
787            }
788
789            let max_dim_id = all_dims.iter().max().copied().unwrap_or(0);
790            let mut dim_bytes: FxHashMap<u32, Vec<u8>> = FxHashMap::default();
791
792            // For each dimension, merge posting lists from all segments
793            for dim_id in all_dims {
794                // Collect posting lists for this dimension from all segments
795                let mut lists_with_offsets: Vec<(&BlockSparsePostingList, u32)> = Vec::new();
796
797                for (seg_idx, segment) in segments.iter().enumerate() {
798                    if let Some(sparse_index) = segment.sparse_indexes().get(&field.0)
799                        && let Some(Some(posting_list)) = sparse_index.postings.get(dim_id as usize)
800                    {
801                        log::trace!(
802                            "Sparse merge dim={}: seg={} offset={} doc_count={} blocks={}",
803                            dim_id,
804                            seg_idx,
805                            doc_offsets[seg_idx],
806                            posting_list.doc_count(),
807                            posting_list.blocks.len()
808                        );
809                        lists_with_offsets.push((posting_list.as_ref(), doc_offsets[seg_idx]));
810                    }
811                }
812
813                if lists_with_offsets.is_empty() {
814                    continue;
815                }
816
817                // Merge using optimized block stacking
818                let merged = BlockSparsePostingList::merge_with_offsets(&lists_with_offsets);
819
820                log::trace!(
821                    "Sparse merge dim={}: merged {} lists -> doc_count={} blocks={}",
822                    dim_id,
823                    lists_with_offsets.len(),
824                    merged.doc_count(),
825                    merged.blocks.len()
826                );
827
828                // Serialize
829                let mut bytes = Vec::new();
830                merged.serialize(&mut bytes).map_err(crate::Error::Io)?;
831                dim_bytes.insert(dim_id, bytes);
832            }
833
834            field_data.push((field.0, quantization, max_dim_id + 1, dim_bytes));
835        }
836
837        if field_data.is_empty() {
838            return Ok(0);
839        }
840
841        // Sort by field_id
842        field_data.sort_by_key(|(id, _, _, _)| *id);
843
844        // Build sparse file (same format as builder)
845        // Header: num_fields (4)
846        // Per field: field_id (4) + quant (1) + max_dim_id (4) + table (12 * max_dim_id)
847        let mut header_size = 4u64;
848        for (_, _, max_dim_id, _) in &field_data {
849            header_size += 4 + 1 + 4; // field_id + quant + max_dim_id
850            header_size += (*max_dim_id as u64) * 12; // table entries
851        }
852
853        let mut output = Vec::new();
854        output.write_u32::<LittleEndian>(field_data.len() as u32)?;
855
856        let mut current_offset = header_size;
857        let mut all_data: Vec<u8> = Vec::new();
858        let mut field_tables: Vec<Vec<(u64, u32)>> = Vec::new();
859
860        for (_, _, max_dim_id, dim_bytes) in &field_data {
861            let mut table: Vec<(u64, u32)> = vec![(0, 0); *max_dim_id as usize];
862
863            for dim_id in 0..*max_dim_id {
864                if let Some(bytes) = dim_bytes.get(&dim_id) {
865                    table[dim_id as usize] = (current_offset, bytes.len() as u32);
866                    current_offset += bytes.len() as u64;
867                    all_data.extend_from_slice(bytes);
868                }
869            }
870            field_tables.push(table);
871        }
872
873        // Write field headers and tables
874        for (i, (field_id, quantization, max_dim_id, _)) in field_data.iter().enumerate() {
875            output.write_u32::<LittleEndian>(*field_id)?;
876            output.write_u8(*quantization as u8)?;
877            output.write_u32::<LittleEndian>(*max_dim_id)?;
878
879            for &(offset, length) in &field_tables[i] {
880                output.write_u64::<LittleEndian>(offset)?;
881                output.write_u32::<LittleEndian>(length)?;
882            }
883        }
884
885        // Append all data
886        output.extend_from_slice(&all_data);
887
888        let output_size = output.len();
889        dir.write(&files.sparse, &output).await?;
890
891        log::info!(
892            "Sparse vector merge complete: {} fields, {} bytes",
893            field_data.len(),
894            output_size
895        );
896
897        Ok(output_size)
898    }
899}
900
901/// Trained vector index structures for rebuilding segments with ANN indexes
902pub struct TrainedVectorStructures {
903    /// Trained centroids per field_id
904    pub centroids: rustc_hash::FxHashMap<u32, Arc<crate::structures::CoarseCentroids>>,
905    /// Trained PQ codebooks per field_id (for ScaNN)
906    pub codebooks: rustc_hash::FxHashMap<u32, Arc<crate::structures::PQCodebook>>,
907}
908
909impl SegmentMerger {
910    /// Merge segments and rebuild dense vectors with ANN indexes using trained structures
911    ///
912    /// This is called after centroids/codebooks are trained at index level.
913    /// It collects Flat vectors from all segments and builds IVF-RaBitQ or ScaNN indexes.
914    pub async fn merge_with_ann<D: Directory + DirectoryWriter>(
915        &self,
916        dir: &D,
917        segments: &[SegmentReader],
918        new_segment_id: SegmentId,
919        trained: &TrainedVectorStructures,
920    ) -> Result<SegmentMeta> {
921        let files = SegmentFiles::new(new_segment_id.0);
922
923        // Build merged term dictionary and postings
924        let mut term_dict_data = Vec::new();
925        let mut postings_data = Vec::new();
926        let mut stats = MergeStats::default();
927        self.merge_postings_with_stats(
928            segments,
929            &mut term_dict_data,
930            &mut postings_data,
931            &mut stats,
932        )
933        .await?;
934
935        // Stack store files
936        let mut store_data = Vec::new();
937        {
938            let mut store_merger = StoreMerger::new(&mut store_data);
939            for segment in segments {
940                let raw_blocks = segment.store_raw_blocks();
941                let data_slice = segment.store_data_slice();
942                store_merger.append_store(data_slice, &raw_blocks).await?;
943            }
944            store_merger.finish()?;
945        }
946
947        // Write text index files
948        dir.write(&files.term_dict, &term_dict_data).await?;
949        dir.write(&files.postings, &postings_data).await?;
950        dir.write(&files.store, &store_data).await?;
951
952        drop(term_dict_data);
953        drop(postings_data);
954        drop(store_data);
955
956        // Build ANN indexes using trained centroids
957        let vectors_bytes = self
958            .build_ann_vectors(dir, segments, &files, trained)
959            .await?;
960
961        // Merge field stats
962        let mut merged_field_stats: rustc_hash::FxHashMap<u32, FieldStats> =
963            rustc_hash::FxHashMap::default();
964        for segment in segments {
965            for (&field_id, field_stats) in &segment.meta().field_stats {
966                let entry = merged_field_stats.entry(field_id).or_default();
967                entry.total_tokens += field_stats.total_tokens;
968                entry.doc_count += field_stats.doc_count;
969            }
970        }
971
972        let total_docs: u32 = segments.iter().map(|s| s.num_docs()).sum();
973        let meta = SegmentMeta {
974            id: new_segment_id.0,
975            num_docs: total_docs,
976            field_stats: merged_field_stats,
977        };
978
979        dir.write(&files.meta, &meta.serialize()?).await?;
980
981        log::info!(
982            "ANN merge complete: {} docs, vectors={}",
983            total_docs,
984            MergeStats::format_memory(vectors_bytes)
985        );
986
987        Ok(meta)
988    }
989
990    /// Build ANN indexes from Flat vectors using trained centroids
991    async fn build_ann_vectors<D: Directory + DirectoryWriter>(
992        &self,
993        dir: &D,
994        segments: &[SegmentReader],
995        files: &SegmentFiles,
996        trained: &TrainedVectorStructures,
997    ) -> Result<usize> {
998        use crate::dsl::VectorIndexType;
999        use byteorder::{LittleEndian, WriteBytesExt};
1000
1001        let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
1002
1003        for (field, entry) in self.schema.fields() {
1004            if !matches!(entry.field_type, FieldType::DenseVector) || !entry.indexed {
1005                continue;
1006            }
1007
1008            let config = match &entry.dense_vector_config {
1009                Some(c) => c,
1010                None => continue,
1011            };
1012
1013            // Collect all Flat vectors from segments
1014            let mut all_vectors: Vec<Vec<f32>> = Vec::new();
1015            let mut all_doc_ids: Vec<(u32, u16)> = Vec::new();
1016            let mut doc_offset = 0u32;
1017
1018            for segment in segments {
1019                if let Some(super::VectorIndex::Flat(flat_data)) =
1020                    segment.vector_indexes().get(&field.0)
1021                {
1022                    for (vec, (local_doc_id, ordinal)) in
1023                        flat_data.vectors.iter().zip(flat_data.doc_ids.iter())
1024                    {
1025                        all_vectors.push(vec.clone());
1026                        all_doc_ids.push((doc_offset + local_doc_id, *ordinal));
1027                    }
1028                }
1029                doc_offset += segment.num_docs();
1030            }
1031
1032            if all_vectors.is_empty() {
1033                continue;
1034            }
1035
1036            let dim = config.index_dim();
1037
1038            // Extract just doc_ids for ANN indexes (they don't track ordinals yet)
1039            let ann_doc_ids: Vec<u32> = all_doc_ids.iter().map(|(doc_id, _)| *doc_id).collect();
1040
1041            // Build ANN index based on index type and available trained structures
1042            match config.index_type {
1043                VectorIndexType::IvfRaBitQ => {
1044                    if let Some(centroids) = trained.centroids.get(&field.0) {
1045                        // Create RaBitQ codebook for the dimension
1046                        let rabitq_config = crate::structures::RaBitQConfig::new(dim);
1047                        let codebook = crate::structures::RaBitQCodebook::new(rabitq_config);
1048
1049                        // Build IVF-RaBitQ index
1050                        let ivf_config = crate::structures::IVFRaBitQConfig::new(dim)
1051                            .with_store_raw(config.store_raw);
1052                        let ivf_index = crate::structures::IVFRaBitQIndex::build(
1053                            ivf_config,
1054                            centroids,
1055                            &codebook,
1056                            &all_vectors,
1057                            Some(&ann_doc_ids),
1058                        );
1059
1060                        let index_data = super::builder::IVFRaBitQIndexData {
1061                            centroids: (**centroids).clone(),
1062                            codebook,
1063                            index: ivf_index,
1064                        };
1065                        let bytes = index_data
1066                            .to_bytes()
1067                            .map_err(|e| crate::Error::Serialization(e.to_string()))?;
1068                        field_indexes.push((field.0, 1u8, bytes)); // 1 = IVF-RaBitQ
1069
1070                        log::info!(
1071                            "Built IVF-RaBitQ index for field {} with {} vectors",
1072                            field.0,
1073                            all_vectors.len()
1074                        );
1075                        continue;
1076                    }
1077                }
1078                VectorIndexType::ScaNN => {
1079                    if let (Some(centroids), Some(codebook)) = (
1080                        trained.centroids.get(&field.0),
1081                        trained.codebooks.get(&field.0),
1082                    ) {
1083                        // Build ScaNN (IVF-PQ) index
1084                        let ivf_pq_config = crate::structures::IVFPQConfig::new(dim);
1085                        let ivf_pq_index = crate::structures::IVFPQIndex::build(
1086                            ivf_pq_config,
1087                            centroids,
1088                            codebook,
1089                            &all_vectors,
1090                            Some(&ann_doc_ids),
1091                        );
1092
1093                        let index_data = super::builder::ScaNNIndexData {
1094                            centroids: (**centroids).clone(),
1095                            codebook: (**codebook).clone(),
1096                            index: ivf_pq_index,
1097                        };
1098                        let bytes = index_data
1099                            .to_bytes()
1100                            .map_err(|e| crate::Error::Serialization(e.to_string()))?;
1101                        field_indexes.push((field.0, 2u8, bytes)); // 2 = ScaNN
1102
1103                        log::info!(
1104                            "Built ScaNN index for field {} with {} vectors",
1105                            field.0,
1106                            all_vectors.len()
1107                        );
1108                        continue;
1109                    }
1110                }
1111                _ => {}
1112            }
1113
1114            // Fallback: keep as Flat if no trained structures available
1115            let flat_data = super::builder::FlatVectorData {
1116                dim,
1117                vectors: all_vectors,
1118                doc_ids: all_doc_ids,
1119            };
1120            let bytes = serde_json::to_vec(&flat_data)
1121                .map_err(|e| crate::Error::Serialization(e.to_string()))?;
1122            field_indexes.push((field.0, 3u8, bytes)); // 3 = Flat
1123        }
1124
1125        // Write vectors file
1126        if !field_indexes.is_empty() {
1127            field_indexes.sort_by_key(|(id, _, _)| *id);
1128
1129            let header_size = 4 + field_indexes.len() * (4 + 1 + 8 + 8);
1130            let mut output = Vec::new();
1131
1132            output.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
1133
1134            let mut current_offset = header_size as u64;
1135            for (field_id, index_type, data) in &field_indexes {
1136                output.write_u32::<LittleEndian>(*field_id)?;
1137                output.write_u8(*index_type)?;
1138                output.write_u64::<LittleEndian>(current_offset)?;
1139                output.write_u64::<LittleEndian>(data.len() as u64)?;
1140                current_offset += data.len() as u64;
1141            }
1142
1143            for (_, _, data) in field_indexes {
1144                output.extend_from_slice(&data);
1145            }
1146
1147            let output_size = output.len();
1148            dir.write(&files.vectors, &output).await?;
1149            return Ok(output_size);
1150        }
1151
1152        Ok(0)
1153    }
1154}
1155
1156/// Delete segment files from directory
1157pub async fn delete_segment<D: Directory + DirectoryWriter>(
1158    dir: &D,
1159    segment_id: SegmentId,
1160) -> Result<()> {
1161    let files = SegmentFiles::new(segment_id.0);
1162    let _ = dir.delete(&files.term_dict).await;
1163    let _ = dir.delete(&files.postings).await;
1164    let _ = dir.delete(&files.store).await;
1165    let _ = dir.delete(&files.meta).await;
1166    let _ = dir.delete(&files.vectors).await;
1167    Ok(())
1168}