Skip to main content

hermes_core/segment/
merger.rs

1//! Segment merger for combining multiple segments
2
3use std::cmp::Ordering;
4use std::collections::BinaryHeap;
5use std::sync::Arc;
6
7use rustc_hash::FxHashMap;
8
9use super::builder::{SegmentBuilder, SegmentBuilderConfig};
10use super::reader::SegmentReader;
11use super::store::StoreMerger;
12use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
13use crate::Result;
14use crate::directories::{Directory, DirectoryWriter};
15use crate::dsl::{FieldType, Schema};
16use crate::structures::{
17    BlockPostingList, PostingList, RaBitQConfig, RaBitQIndex, SSTableWriter, TERMINATED, TermInfo,
18};
19
20/// Statistics for merge operations
21#[derive(Debug, Clone, Default)]
22pub struct MergeStats {
23    /// Number of terms processed
24    pub terms_processed: usize,
25    /// Number of postings merged
26    pub postings_merged: usize,
27    /// Peak memory usage in bytes (estimated)
28    pub peak_memory_bytes: usize,
29    /// Current memory usage in bytes (estimated)
30    pub current_memory_bytes: usize,
31    /// Term dictionary output size
32    pub term_dict_bytes: usize,
33    /// Postings output size
34    pub postings_bytes: usize,
35    /// Store output size
36    pub store_bytes: usize,
37    /// Vector index output size
38    pub vectors_bytes: usize,
39    /// Sparse vector index output size
40    pub sparse_bytes: usize,
41}
42
43impl MergeStats {
44    /// Format memory as human-readable string
45    pub fn format_memory(bytes: usize) -> String {
46        if bytes >= 1024 * 1024 * 1024 {
47            format!("{:.2} GB", bytes as f64 / (1024.0 * 1024.0 * 1024.0))
48        } else if bytes >= 1024 * 1024 {
49            format!("{:.2} MB", bytes as f64 / (1024.0 * 1024.0))
50        } else if bytes >= 1024 {
51            format!("{:.2} KB", bytes as f64 / 1024.0)
52        } else {
53            format!("{} B", bytes)
54        }
55    }
56}
57
58/// Entry for k-way merge heap
59struct MergeEntry {
60    key: Vec<u8>,
61    term_info: TermInfo,
62    segment_idx: usize,
63    doc_offset: u32,
64}
65
66impl PartialEq for MergeEntry {
67    fn eq(&self, other: &Self) -> bool {
68        self.key == other.key
69    }
70}
71
72impl Eq for MergeEntry {}
73
74impl PartialOrd for MergeEntry {
75    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
76        Some(self.cmp(other))
77    }
78}
79
80impl Ord for MergeEntry {
81    fn cmp(&self, other: &Self) -> Ordering {
82        // Reverse order for min-heap (BinaryHeap is max-heap by default)
83        other.key.cmp(&self.key)
84    }
85}
86
87/// Segment merger - merges multiple segments into one
88pub struct SegmentMerger {
89    schema: Arc<Schema>,
90}
91
92impl SegmentMerger {
93    pub fn new(schema: Arc<Schema>) -> Self {
94        Self { schema }
95    }
96
97    /// Merge segments - uses optimized store stacking when possible
98    pub async fn merge<D: Directory + DirectoryWriter>(
99        &self,
100        dir: &D,
101        segments: &[SegmentReader],
102        new_segment_id: SegmentId,
103    ) -> Result<SegmentMeta> {
104        let (meta, _stats) = self.merge_with_stats(dir, segments, new_segment_id).await?;
105        Ok(meta)
106    }
107
108    /// Merge segments with memory tracking - returns merge statistics
109    pub async fn merge_with_stats<D: Directory + DirectoryWriter>(
110        &self,
111        dir: &D,
112        segments: &[SegmentReader],
113        new_segment_id: SegmentId,
114    ) -> Result<(SegmentMeta, MergeStats)> {
115        // Check if we can use optimized store stacking (no dictionaries)
116        let can_stack_stores = segments.iter().all(|s| !s.store_has_dict());
117
118        // Check if any segment has positions - if so, use rebuild merge
119        // (positions require doc ID remapping which optimized merge doesn't handle)
120        let has_positions = self
121            .schema
122            .fields()
123            .any(|(_, entry)| entry.positions.is_some());
124
125        // Note: sparse vectors now use optimized block stacking merge (no rebuild needed)
126        if can_stack_stores && !has_positions {
127            self.merge_optimized_with_stats(dir, segments, new_segment_id)
128                .await
129        } else {
130            self.merge_rebuild_with_stats(dir, segments, new_segment_id)
131                .await
132        }
133    }
134
135    /// Optimized merge with stats tracking
136    async fn merge_optimized_with_stats<D: Directory + DirectoryWriter>(
137        &self,
138        dir: &D,
139        segments: &[SegmentReader],
140        new_segment_id: SegmentId,
141    ) -> Result<(SegmentMeta, MergeStats)> {
142        let mut stats = MergeStats::default();
143        let files = SegmentFiles::new(new_segment_id.0);
144
145        // Build merged term dictionary and postings
146        let mut term_dict_data = Vec::new();
147        let mut postings_data = Vec::new();
148        let terms_processed = self
149            .merge_postings_with_stats(
150                segments,
151                &mut term_dict_data,
152                &mut postings_data,
153                &mut stats,
154            )
155            .await?;
156        stats.terms_processed = terms_processed;
157        stats.term_dict_bytes = term_dict_data.len();
158        stats.postings_bytes = postings_data.len();
159
160        // Track peak memory (term dict + postings buffers)
161        let current_mem = term_dict_data.capacity() + postings_data.capacity();
162        stats.current_memory_bytes = current_mem;
163        stats.peak_memory_bytes = stats.peak_memory_bytes.max(current_mem);
164
165        // Stack store files without recompression
166        let mut store_data = Vec::new();
167        {
168            let mut store_merger = StoreMerger::new(&mut store_data);
169            for segment in segments {
170                let raw_blocks = segment.store_raw_blocks();
171                let data_slice = segment.store_data_slice();
172                store_merger.append_store(data_slice, &raw_blocks).await?;
173            }
174            store_merger.finish()?;
175        }
176        stats.store_bytes = store_data.len();
177
178        // Update peak memory
179        let current_mem =
180            term_dict_data.capacity() + postings_data.capacity() + store_data.capacity();
181        stats.peak_memory_bytes = stats.peak_memory_bytes.max(current_mem);
182
183        // Write files
184        dir.write(&files.term_dict, &term_dict_data).await?;
185        dir.write(&files.postings, &postings_data).await?;
186        dir.write(&files.store, &store_data).await?;
187
188        // Free memory after writing
189        drop(term_dict_data);
190        drop(postings_data);
191        drop(store_data);
192
193        // Merge dense vector indexes
194        let vectors_bytes = self
195            .merge_dense_vectors_with_stats(dir, segments, &files)
196            .await?;
197        stats.vectors_bytes = vectors_bytes;
198
199        // Merge sparse vector indexes (optimized block stacking)
200        let sparse_bytes = self
201            .merge_sparse_vectors_optimized(dir, segments, &files)
202            .await?;
203        stats.sparse_bytes = sparse_bytes;
204
205        // Merge field stats
206        let mut merged_field_stats: FxHashMap<u32, FieldStats> = FxHashMap::default();
207        for segment in segments {
208            for (&field_id, field_stats) in &segment.meta().field_stats {
209                let entry = merged_field_stats.entry(field_id).or_default();
210                entry.total_tokens += field_stats.total_tokens;
211                entry.doc_count += field_stats.doc_count;
212            }
213        }
214
215        let total_docs: u32 = segments.iter().map(|s| s.num_docs()).sum();
216        let meta = SegmentMeta {
217            id: new_segment_id.0,
218            num_docs: total_docs,
219            field_stats: merged_field_stats,
220        };
221
222        dir.write(&files.meta, &meta.serialize()?).await?;
223
224        log::info!(
225            "Merge complete: {} terms, output: term_dict={}, postings={}, store={}, vectors={}, sparse={}",
226            stats.terms_processed,
227            MergeStats::format_memory(stats.term_dict_bytes),
228            MergeStats::format_memory(stats.postings_bytes),
229            MergeStats::format_memory(stats.store_bytes),
230            MergeStats::format_memory(stats.vectors_bytes),
231            MergeStats::format_memory(stats.sparse_bytes),
232        );
233
234        Ok((meta, stats))
235    }
236
237    /// Fallback merge with stats tracking
238    async fn merge_rebuild_with_stats<D: Directory + DirectoryWriter>(
239        &self,
240        dir: &D,
241        segments: &[SegmentReader],
242        new_segment_id: SegmentId,
243    ) -> Result<(SegmentMeta, MergeStats)> {
244        let mut stats = MergeStats::default();
245
246        let mut builder =
247            SegmentBuilder::new((*self.schema).clone(), SegmentBuilderConfig::default())?;
248
249        for segment in segments {
250            for doc_id in 0..segment.num_docs() {
251                if let Some(doc) = segment.doc(doc_id).await? {
252                    builder.add_document(doc)?;
253                }
254
255                // Track memory periodically
256                if doc_id % 10000 == 0 {
257                    let builder_stats = builder.stats();
258                    stats.current_memory_bytes = builder_stats.estimated_memory_bytes;
259                    stats.peak_memory_bytes =
260                        stats.peak_memory_bytes.max(stats.current_memory_bytes);
261                }
262            }
263        }
264
265        let meta = builder.build(dir, new_segment_id).await?;
266        Ok((meta, stats))
267    }
268
269    /// Merge postings from multiple segments using streaming k-way merge
270    ///
271    /// This implementation uses a min-heap to merge terms from all segments
272    /// in sorted order without loading all terms into memory at once.
273    /// Memory usage is O(num_segments) instead of O(total_terms).
274    ///
275    /// Optimization: For terms that exist in only one segment, we copy the
276    /// posting data directly without decode/encode. Only terms that exist
277    /// in multiple segments need full merge.
278    ///
279    /// Returns the number of terms processed.
280    async fn merge_postings_with_stats(
281        &self,
282        segments: &[SegmentReader],
283        term_dict: &mut Vec<u8>,
284        postings_out: &mut Vec<u8>,
285        stats: &mut MergeStats,
286    ) -> Result<usize> {
287        // Calculate doc offsets for each segment
288        let mut doc_offsets = Vec::with_capacity(segments.len());
289        let mut offset = 0u32;
290        for segment in segments {
291            doc_offsets.push(offset);
292            offset += segment.num_docs();
293        }
294
295        // Create iterators for each segment's term dictionary
296        let mut iterators: Vec<_> = segments.iter().map(|s| s.term_dict_iter()).collect();
297
298        // Initialize min-heap with first entry from each segment
299        let mut heap: BinaryHeap<MergeEntry> = BinaryHeap::new();
300        for (seg_idx, iter) in iterators.iter_mut().enumerate() {
301            if let Some((key, term_info)) = iter.next().await.map_err(crate::Error::from)? {
302                heap.push(MergeEntry {
303                    key,
304                    term_info,
305                    segment_idx: seg_idx,
306                    doc_offset: doc_offsets[seg_idx],
307                });
308            }
309        }
310
311        // Collect results for SSTable writing (need to buffer due to async)
312        let mut term_results: Vec<(Vec<u8>, TermInfo)> = Vec::new();
313        let mut terms_processed = 0usize;
314
315        while !heap.is_empty() {
316            // Get the smallest key
317            let first = heap.pop().unwrap();
318            let current_key = first.key.clone();
319
320            // Collect all entries with the same key
321            let mut sources: Vec<(usize, TermInfo, u32)> =
322                vec![(first.segment_idx, first.term_info, first.doc_offset)];
323
324            // Advance the iterator that provided this entry
325            if let Some((key, term_info)) = iterators[first.segment_idx]
326                .next()
327                .await
328                .map_err(crate::Error::from)?
329            {
330                heap.push(MergeEntry {
331                    key,
332                    term_info,
333                    segment_idx: first.segment_idx,
334                    doc_offset: doc_offsets[first.segment_idx],
335                });
336            }
337
338            // Check if other segments have the same key
339            while let Some(entry) = heap.peek() {
340                if entry.key != current_key {
341                    break;
342                }
343                let entry = heap.pop().unwrap();
344                sources.push((entry.segment_idx, entry.term_info, entry.doc_offset));
345
346                // Advance this iterator too
347                if let Some((key, term_info)) = iterators[entry.segment_idx]
348                    .next()
349                    .await
350                    .map_err(crate::Error::from)?
351                {
352                    heap.push(MergeEntry {
353                        key,
354                        term_info,
355                        segment_idx: entry.segment_idx,
356                        doc_offset: doc_offsets[entry.segment_idx],
357                    });
358                }
359            }
360
361            // Process this term
362            let term_info = if sources.len() == 1 {
363                // Optimization: Term exists in only one segment - copy directly
364                let (seg_idx, source_info, seg_doc_offset) = &sources[0];
365                self.copy_term_posting(
366                    &segments[*seg_idx],
367                    source_info,
368                    *seg_doc_offset,
369                    postings_out,
370                )
371                .await?
372            } else {
373                // Term exists in multiple segments - need full merge
374                self.merge_term_postings(segments, &sources, postings_out)
375                    .await?
376            };
377
378            term_results.push((current_key, term_info));
379            terms_processed += 1;
380
381            // Log progress every 100k terms
382            if terms_processed.is_multiple_of(100_000) {
383                log::debug!("Merge progress: {} terms processed", terms_processed);
384            }
385        }
386
387        log::info!(
388            "Merge complete: {} terms processed from {} segments",
389            terms_processed,
390            segments.len()
391        );
392
393        // Track memory for term_results buffer
394        let results_mem = term_results.capacity() * std::mem::size_of::<(Vec<u8>, TermInfo)>();
395        stats.current_memory_bytes = results_mem + postings_out.capacity();
396        stats.peak_memory_bytes = stats.peak_memory_bytes.max(stats.current_memory_bytes);
397
398        // Write to SSTable (sync, no await points)
399        let mut writer = SSTableWriter::<TermInfo>::new(term_dict);
400        for (key, term_info) in term_results {
401            writer.insert(&key, &term_info)?;
402        }
403        writer.finish()?;
404
405        Ok(terms_processed)
406    }
407
408    /// Copy a term's posting data directly from source segment (no decode/encode)
409    /// Only adjusts doc IDs by adding the segment's doc offset
410    async fn copy_term_posting(
411        &self,
412        segment: &SegmentReader,
413        source_info: &TermInfo,
414        doc_offset: u32,
415        postings_out: &mut Vec<u8>,
416    ) -> Result<TermInfo> {
417        // Handle inline postings - need to remap doc IDs
418        if let Some((doc_ids, term_freqs)) = source_info.decode_inline() {
419            let remapped_ids: Vec<u32> = doc_ids.iter().map(|&id| id + doc_offset).collect();
420            if let Some(inline) = TermInfo::try_inline(&remapped_ids, &term_freqs) {
421                return Ok(inline);
422            }
423            // If can't inline after remapping (shouldn't happen), fall through to external
424            let mut pl = PostingList::with_capacity(remapped_ids.len());
425            for (doc_id, tf) in remapped_ids.into_iter().zip(term_freqs.into_iter()) {
426                pl.push(doc_id, tf);
427            }
428            let posting_offset = postings_out.len() as u64;
429            let block_list = BlockPostingList::from_posting_list(&pl)?;
430            let mut encoded = Vec::new();
431            block_list.serialize(&mut encoded)?;
432            postings_out.extend_from_slice(&encoded);
433            return Ok(TermInfo::external(
434                posting_offset,
435                encoded.len() as u32,
436                pl.doc_count(),
437            ));
438        }
439
440        // External posting - read, decode, remap doc IDs, re-encode
441        // Note: We can't just copy bytes because doc IDs are delta-encoded
442        let (offset, len) = source_info.external_info().unwrap();
443        let posting_bytes = segment.read_postings(offset, len).await?;
444        let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
445
446        // Remap doc IDs
447        let mut remapped = PostingList::with_capacity(source_postings.doc_count() as usize);
448        let mut iter = source_postings.iterator();
449        while iter.doc() != TERMINATED {
450            remapped.add(iter.doc() + doc_offset, iter.term_freq());
451            iter.advance();
452        }
453
454        // Try to inline if small enough
455        let doc_ids: Vec<u32> = remapped.iter().map(|p| p.doc_id).collect();
456        let term_freqs: Vec<u32> = remapped.iter().map(|p| p.term_freq).collect();
457
458        if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
459            return Ok(inline);
460        }
461
462        // Write to postings file
463        let posting_offset = postings_out.len() as u64;
464        let block_list = BlockPostingList::from_posting_list(&remapped)?;
465        let mut encoded = Vec::new();
466        block_list.serialize(&mut encoded)?;
467        postings_out.extend_from_slice(&encoded);
468
469        Ok(TermInfo::external(
470            posting_offset,
471            encoded.len() as u32,
472            remapped.doc_count(),
473        ))
474    }
475
476    /// Merge postings for a term that exists in multiple segments
477    /// Uses block-level concatenation for O(num_blocks) instead of O(num_postings)
478    async fn merge_term_postings(
479        &self,
480        segments: &[SegmentReader],
481        sources: &[(usize, TermInfo, u32)],
482        postings_out: &mut Vec<u8>,
483    ) -> Result<TermInfo> {
484        // Sort sources by doc_offset to ensure postings are added in sorted order
485        let mut sorted_sources: Vec<_> = sources.to_vec();
486        sorted_sources.sort_by_key(|(_, _, doc_offset)| *doc_offset);
487
488        // Check if all sources are external (can use block concatenation)
489        let all_external = sorted_sources
490            .iter()
491            .all(|(_, term_info, _)| term_info.external_info().is_some());
492
493        if all_external && sorted_sources.len() > 1 {
494            // Fast path: block-level concatenation
495            let mut block_sources = Vec::with_capacity(sorted_sources.len());
496
497            for (seg_idx, term_info, doc_offset) in &sorted_sources {
498                let segment = &segments[*seg_idx];
499                let (offset, len) = term_info.external_info().unwrap();
500                let posting_bytes = segment.read_postings(offset, len).await?;
501                let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
502                block_sources.push((source_postings, *doc_offset));
503            }
504
505            let merged_blocks = BlockPostingList::concatenate_blocks(&block_sources)?;
506            let posting_offset = postings_out.len() as u64;
507            let mut encoded = Vec::new();
508            merged_blocks.serialize(&mut encoded)?;
509            postings_out.extend_from_slice(&encoded);
510
511            return Ok(TermInfo::external(
512                posting_offset,
513                encoded.len() as u32,
514                merged_blocks.doc_count(),
515            ));
516        }
517
518        // Slow path: full decode/encode for inline postings or single source
519        let mut merged = PostingList::new();
520
521        for (seg_idx, term_info, doc_offset) in &sorted_sources {
522            let segment = &segments[*seg_idx];
523
524            if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
525                // Inline posting list
526                for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
527                    merged.add(doc_id + doc_offset, tf);
528                }
529            } else {
530                // External posting list
531                let (offset, len) = term_info.external_info().unwrap();
532                let posting_bytes = segment.read_postings(offset, len).await?;
533                let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
534
535                let mut iter = source_postings.iterator();
536                while iter.doc() != TERMINATED {
537                    merged.add(iter.doc() + doc_offset, iter.term_freq());
538                    iter.advance();
539                }
540            }
541        }
542
543        // Try to inline small posting lists
544        let doc_ids: Vec<u32> = merged.iter().map(|p| p.doc_id).collect();
545        let term_freqs: Vec<u32> = merged.iter().map(|p| p.term_freq).collect();
546
547        if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
548            return Ok(inline);
549        }
550
551        // Write to postings file
552        let posting_offset = postings_out.len() as u64;
553        let block_list = BlockPostingList::from_posting_list(&merged)?;
554        let mut encoded = Vec::new();
555        block_list.serialize(&mut encoded)?;
556        postings_out.extend_from_slice(&encoded);
557
558        Ok(TermInfo::external(
559            posting_offset,
560            encoded.len() as u32,
561            merged.doc_count(),
562        ))
563    }
564    /// Merge dense vector indexes with stats tracking - returns output size in bytes
565    ///
566    /// For ScaNN (IVF-PQ): O(1) merge by concatenating cluster data (same codebook)
567    /// For IVF-RaBitQ: O(1) merge by concatenating cluster data (same centroids)
568    /// For RaBitQ: Must rebuild with new centroid from all vectors
569    async fn merge_dense_vectors_with_stats<D: Directory + DirectoryWriter>(
570        &self,
571        dir: &D,
572        segments: &[SegmentReader],
573        files: &SegmentFiles,
574    ) -> Result<usize> {
575        use byteorder::{LittleEndian, WriteBytesExt};
576
577        // (field_id, index_type, data)
578        let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
579
580        for (field, entry) in self.schema.fields() {
581            if !matches!(entry.field_type, FieldType::DenseVector) {
582                continue;
583            }
584
585            // Check if all segments have ScaNN indexes for this field
586            let scann_indexes: Vec<_> = segments
587                .iter()
588                .filter_map(|s| s.get_scann_vector_index(field))
589                .collect();
590
591            if scann_indexes.len()
592                == segments
593                    .iter()
594                    .filter(|s| s.has_dense_vector_index(field))
595                    .count()
596                && !scann_indexes.is_empty()
597            {
598                // All segments have ScaNN - use O(1) cluster merge!
599                let refs: Vec<&crate::structures::IVFPQIndex> =
600                    scann_indexes.iter().map(|(idx, _)| idx.as_ref()).collect();
601
602                // Calculate doc_id offsets
603                let mut doc_offsets = Vec::with_capacity(segments.len());
604                let mut offset = 0u32;
605                for segment in segments {
606                    doc_offsets.push(offset);
607                    offset += segment.num_docs();
608                }
609
610                match crate::structures::IVFPQIndex::merge(&refs, &doc_offsets) {
611                    Ok(merged) => {
612                        let bytes = merged
613                            .to_bytes()
614                            .map_err(|e| crate::Error::Serialization(e.to_string()))?;
615                        field_indexes.push((field.0, 2u8, bytes)); // 2 = ScaNN
616                        continue;
617                    }
618                    Err(e) => {
619                        log::warn!("ScaNN merge failed: {}, falling back to IVF", e);
620                    }
621                }
622            }
623
624            // Check if all segments have IVF indexes for this field
625            let ivf_indexes: Vec<_> = segments
626                .iter()
627                .filter_map(|s| s.get_ivf_vector_index(field))
628                .collect();
629
630            if ivf_indexes.len()
631                == segments
632                    .iter()
633                    .filter(|s| s.has_dense_vector_index(field))
634                    .count()
635                && !ivf_indexes.is_empty()
636            {
637                // All segments have IVF - use O(1) cluster merge!
638                let refs: Vec<&crate::structures::IVFRaBitQIndex> =
639                    ivf_indexes.iter().map(|arc| arc.as_ref()).collect();
640
641                // Calculate doc_id offsets
642                let mut doc_offsets = Vec::with_capacity(segments.len());
643                let mut offset = 0u32;
644                for segment in segments {
645                    doc_offsets.push(offset);
646                    offset += segment.num_docs();
647                }
648
649                match crate::structures::IVFRaBitQIndex::merge(&refs, &doc_offsets) {
650                    Ok(merged) => {
651                        let bytes = merged
652                            .to_bytes()
653                            .map_err(|e| crate::Error::Serialization(e.to_string()))?;
654                        field_indexes.push((field.0, 1u8, bytes)); // 1 = IVF-RaBitQ
655                        continue;
656                    }
657                    Err(e) => {
658                        log::warn!("IVF merge failed: {}, falling back to rebuild", e);
659                    }
660                }
661            }
662
663            // Fall back to RaBitQ rebuild (collect raw vectors)
664            let mut all_vectors: Vec<Vec<f32>> = Vec::new();
665
666            for segment in segments {
667                if let Some(index) = segment.get_dense_vector_index(field)
668                    && let Some(raw_vecs) = &index.raw_vectors
669                {
670                    all_vectors.extend(raw_vecs.iter().cloned());
671                }
672            }
673
674            if !all_vectors.is_empty() {
675                let dim = all_vectors[0].len();
676                let config = RaBitQConfig::new(dim);
677                let merged_index = RaBitQIndex::build(config, &all_vectors, true);
678
679                let index_bytes = serde_json::to_vec(&merged_index)
680                    .map_err(|e| crate::Error::Serialization(e.to_string()))?;
681
682                field_indexes.push((field.0, 0u8, index_bytes)); // 0 = RaBitQ
683            }
684        }
685
686        // Write unified vectors file with index_type
687        if !field_indexes.is_empty() {
688            field_indexes.sort_by_key(|(id, _, _)| *id);
689
690            // Header: num_fields + (field_id, index_type, offset, len) per field
691            let header_size = 4 + field_indexes.len() * (4 + 1 + 8 + 8);
692            let mut output = Vec::new();
693
694            output.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
695
696            let mut current_offset = header_size as u64;
697            for (field_id, index_type, data) in &field_indexes {
698                output.write_u32::<LittleEndian>(*field_id)?;
699                output.write_u8(*index_type)?;
700                output.write_u64::<LittleEndian>(current_offset)?;
701                output.write_u64::<LittleEndian>(data.len() as u64)?;
702                current_offset += data.len() as u64;
703            }
704
705            for (_, _, data) in field_indexes {
706                output.extend_from_slice(&data);
707            }
708
709            let output_size = output.len();
710            dir.write(&files.vectors, &output).await?;
711            return Ok(output_size);
712        }
713
714        Ok(0)
715    }
716
717    /// Merge sparse vector indexes using optimized block stacking
718    ///
719    /// This is O(blocks) instead of O(postings) - we stack blocks directly
720    /// and only adjust the first_doc_id in each block header by the doc offset.
721    /// Deltas within blocks remain unchanged since they're relative.
722    async fn merge_sparse_vectors_optimized<D: Directory + DirectoryWriter>(
723        &self,
724        dir: &D,
725        segments: &[SegmentReader],
726        files: &SegmentFiles,
727    ) -> Result<usize> {
728        use crate::structures::BlockSparsePostingList;
729        use byteorder::{LittleEndian, WriteBytesExt};
730
731        // Calculate doc offsets for each segment
732        let mut doc_offsets = Vec::with_capacity(segments.len());
733        let mut offset = 0u32;
734        for segment in segments {
735            doc_offsets.push(offset);
736            offset += segment.num_docs();
737        }
738
739        // Collect all sparse vector fields from schema
740        let sparse_fields: Vec<_> = self
741            .schema
742            .fields()
743            .filter(|(_, entry)| matches!(entry.field_type, FieldType::SparseVector))
744            .map(|(field, entry)| (field, entry.sparse_vector_config.clone()))
745            .collect();
746
747        if sparse_fields.is_empty() {
748            return Ok(0);
749        }
750
751        // Collect field data: (field_id, quantization, max_dim_id, dim_id -> merged_posting_list)
752        type SparseFieldData = (
753            u32,
754            crate::structures::WeightQuantization,
755            u32,
756            FxHashMap<u32, Vec<u8>>,
757        );
758        let mut field_data: Vec<SparseFieldData> = Vec::new();
759
760        for (field, sparse_config) in &sparse_fields {
761            // Get quantization from config
762            let quantization = sparse_config
763                .as_ref()
764                .map(|c| c.weight_quantization)
765                .unwrap_or(crate::structures::WeightQuantization::Float32);
766
767            // Collect all dimensions across all segments for this field
768            let mut all_dims: rustc_hash::FxHashSet<u32> = rustc_hash::FxHashSet::default();
769            for segment in segments {
770                if let Some(sparse_index) = segment.sparse_indexes().get(&field.0) {
771                    for (dim_id, posting) in sparse_index.postings.iter().enumerate() {
772                        if posting.is_some() {
773                            all_dims.insert(dim_id as u32);
774                        }
775                    }
776                }
777            }
778
779            if all_dims.is_empty() {
780                continue;
781            }
782
783            let max_dim_id = all_dims.iter().max().copied().unwrap_or(0);
784            let mut dim_bytes: FxHashMap<u32, Vec<u8>> = FxHashMap::default();
785
786            // For each dimension, merge posting lists from all segments
787            for dim_id in all_dims {
788                // Collect posting lists for this dimension from all segments
789                let mut lists_with_offsets: Vec<(&BlockSparsePostingList, u32)> = Vec::new();
790
791                for (seg_idx, segment) in segments.iter().enumerate() {
792                    if let Some(sparse_index) = segment.sparse_indexes().get(&field.0)
793                        && let Some(Some(posting_list)) = sparse_index.postings.get(dim_id as usize)
794                    {
795                        lists_with_offsets.push((posting_list.as_ref(), doc_offsets[seg_idx]));
796                    }
797                }
798
799                if lists_with_offsets.is_empty() {
800                    continue;
801                }
802
803                // Merge using optimized block stacking
804                let merged = BlockSparsePostingList::merge_with_offsets(&lists_with_offsets);
805
806                // Serialize
807                let mut bytes = Vec::new();
808                merged.serialize(&mut bytes).map_err(crate::Error::Io)?;
809                dim_bytes.insert(dim_id, bytes);
810            }
811
812            field_data.push((field.0, quantization, max_dim_id + 1, dim_bytes));
813        }
814
815        if field_data.is_empty() {
816            return Ok(0);
817        }
818
819        // Sort by field_id
820        field_data.sort_by_key(|(id, _, _, _)| *id);
821
822        // Build sparse file (same format as builder)
823        // Header: num_fields (4)
824        // Per field: field_id (4) + quant (1) + max_dim_id (4) + table (12 * max_dim_id)
825        let mut header_size = 4u64;
826        for (_, _, max_dim_id, _) in &field_data {
827            header_size += 4 + 1 + 4; // field_id + quant + max_dim_id
828            header_size += (*max_dim_id as u64) * 12; // table entries
829        }
830
831        let mut output = Vec::new();
832        output.write_u32::<LittleEndian>(field_data.len() as u32)?;
833
834        let mut current_offset = header_size;
835        let mut all_data: Vec<u8> = Vec::new();
836        let mut field_tables: Vec<Vec<(u64, u32)>> = Vec::new();
837
838        for (_, _, max_dim_id, dim_bytes) in &field_data {
839            let mut table: Vec<(u64, u32)> = vec![(0, 0); *max_dim_id as usize];
840
841            for dim_id in 0..*max_dim_id {
842                if let Some(bytes) = dim_bytes.get(&dim_id) {
843                    table[dim_id as usize] = (current_offset, bytes.len() as u32);
844                    current_offset += bytes.len() as u64;
845                    all_data.extend_from_slice(bytes);
846                }
847            }
848            field_tables.push(table);
849        }
850
851        // Write field headers and tables
852        for (i, (field_id, quantization, max_dim_id, _)) in field_data.iter().enumerate() {
853            output.write_u32::<LittleEndian>(*field_id)?;
854            output.write_u8(*quantization as u8)?;
855            output.write_u32::<LittleEndian>(*max_dim_id)?;
856
857            for &(offset, length) in &field_tables[i] {
858                output.write_u64::<LittleEndian>(offset)?;
859                output.write_u32::<LittleEndian>(length)?;
860            }
861        }
862
863        // Append all data
864        output.extend_from_slice(&all_data);
865
866        let output_size = output.len();
867        dir.write(&files.sparse, &output).await?;
868
869        log::info!(
870            "Sparse vector merge complete: {} fields, {} bytes",
871            field_data.len(),
872            output_size
873        );
874
875        Ok(output_size)
876    }
877}
878
879/// Trained vector index structures for rebuilding segments with ANN indexes
880pub struct TrainedVectorStructures {
881    /// Trained centroids per field_id
882    pub centroids: rustc_hash::FxHashMap<u32, Arc<crate::structures::CoarseCentroids>>,
883    /// Trained PQ codebooks per field_id (for ScaNN)
884    pub codebooks: rustc_hash::FxHashMap<u32, Arc<crate::structures::PQCodebook>>,
885}
886
887impl SegmentMerger {
888    /// Merge segments and rebuild dense vectors with ANN indexes using trained structures
889    ///
890    /// This is called after centroids/codebooks are trained at index level.
891    /// It collects Flat vectors from all segments and builds IVF-RaBitQ or ScaNN indexes.
892    pub async fn merge_with_ann<D: Directory + DirectoryWriter>(
893        &self,
894        dir: &D,
895        segments: &[SegmentReader],
896        new_segment_id: SegmentId,
897        trained: &TrainedVectorStructures,
898    ) -> Result<SegmentMeta> {
899        let files = SegmentFiles::new(new_segment_id.0);
900
901        // Build merged term dictionary and postings
902        let mut term_dict_data = Vec::new();
903        let mut postings_data = Vec::new();
904        let mut stats = MergeStats::default();
905        self.merge_postings_with_stats(
906            segments,
907            &mut term_dict_data,
908            &mut postings_data,
909            &mut stats,
910        )
911        .await?;
912
913        // Stack store files
914        let mut store_data = Vec::new();
915        {
916            let mut store_merger = StoreMerger::new(&mut store_data);
917            for segment in segments {
918                let raw_blocks = segment.store_raw_blocks();
919                let data_slice = segment.store_data_slice();
920                store_merger.append_store(data_slice, &raw_blocks).await?;
921            }
922            store_merger.finish()?;
923        }
924
925        // Write text index files
926        dir.write(&files.term_dict, &term_dict_data).await?;
927        dir.write(&files.postings, &postings_data).await?;
928        dir.write(&files.store, &store_data).await?;
929
930        drop(term_dict_data);
931        drop(postings_data);
932        drop(store_data);
933
934        // Build ANN indexes using trained centroids
935        let vectors_bytes = self
936            .build_ann_vectors(dir, segments, &files, trained)
937            .await?;
938
939        // Merge field stats
940        let mut merged_field_stats: rustc_hash::FxHashMap<u32, FieldStats> =
941            rustc_hash::FxHashMap::default();
942        for segment in segments {
943            for (&field_id, field_stats) in &segment.meta().field_stats {
944                let entry = merged_field_stats.entry(field_id).or_default();
945                entry.total_tokens += field_stats.total_tokens;
946                entry.doc_count += field_stats.doc_count;
947            }
948        }
949
950        let total_docs: u32 = segments.iter().map(|s| s.num_docs()).sum();
951        let meta = SegmentMeta {
952            id: new_segment_id.0,
953            num_docs: total_docs,
954            field_stats: merged_field_stats,
955        };
956
957        dir.write(&files.meta, &meta.serialize()?).await?;
958
959        log::info!(
960            "ANN merge complete: {} docs, vectors={}",
961            total_docs,
962            MergeStats::format_memory(vectors_bytes)
963        );
964
965        Ok(meta)
966    }
967
968    /// Build ANN indexes from Flat vectors using trained centroids
969    async fn build_ann_vectors<D: Directory + DirectoryWriter>(
970        &self,
971        dir: &D,
972        segments: &[SegmentReader],
973        files: &SegmentFiles,
974        trained: &TrainedVectorStructures,
975    ) -> Result<usize> {
976        use crate::dsl::VectorIndexType;
977        use byteorder::{LittleEndian, WriteBytesExt};
978
979        let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
980
981        for (field, entry) in self.schema.fields() {
982            if !matches!(entry.field_type, FieldType::DenseVector) || !entry.indexed {
983                continue;
984            }
985
986            let config = match &entry.dense_vector_config {
987                Some(c) => c,
988                None => continue,
989            };
990
991            // Collect all Flat vectors from segments
992            let mut all_vectors: Vec<Vec<f32>> = Vec::new();
993            let mut all_doc_ids: Vec<(u32, u16)> = Vec::new();
994            let mut doc_offset = 0u32;
995
996            for segment in segments {
997                if let Some(super::VectorIndex::Flat(flat_data)) =
998                    segment.vector_indexes().get(&field.0)
999                {
1000                    for (vec, (local_doc_id, ordinal)) in
1001                        flat_data.vectors.iter().zip(flat_data.doc_ids.iter())
1002                    {
1003                        all_vectors.push(vec.clone());
1004                        all_doc_ids.push((doc_offset + local_doc_id, *ordinal));
1005                    }
1006                }
1007                doc_offset += segment.num_docs();
1008            }
1009
1010            if all_vectors.is_empty() {
1011                continue;
1012            }
1013
1014            let dim = config.index_dim();
1015
1016            // Extract just doc_ids for ANN indexes (they don't track ordinals yet)
1017            let ann_doc_ids: Vec<u32> = all_doc_ids.iter().map(|(doc_id, _)| *doc_id).collect();
1018
1019            // Build ANN index based on index type and available trained structures
1020            match config.index_type {
1021                VectorIndexType::IvfRaBitQ => {
1022                    if let Some(centroids) = trained.centroids.get(&field.0) {
1023                        // Create RaBitQ codebook for the dimension
1024                        let rabitq_config = crate::structures::RaBitQConfig::new(dim);
1025                        let codebook = crate::structures::RaBitQCodebook::new(rabitq_config);
1026
1027                        // Build IVF-RaBitQ index
1028                        let ivf_config = crate::structures::IVFRaBitQConfig::new(dim)
1029                            .with_store_raw(config.store_raw);
1030                        let ivf_index = crate::structures::IVFRaBitQIndex::build(
1031                            ivf_config,
1032                            centroids,
1033                            &codebook,
1034                            &all_vectors,
1035                            Some(&ann_doc_ids),
1036                        );
1037
1038                        let index_data = super::builder::IVFRaBitQIndexData {
1039                            centroids: (**centroids).clone(),
1040                            codebook,
1041                            index: ivf_index,
1042                        };
1043                        let bytes = index_data
1044                            .to_bytes()
1045                            .map_err(|e| crate::Error::Serialization(e.to_string()))?;
1046                        field_indexes.push((field.0, 1u8, bytes)); // 1 = IVF-RaBitQ
1047
1048                        log::info!(
1049                            "Built IVF-RaBitQ index for field {} with {} vectors",
1050                            field.0,
1051                            all_vectors.len()
1052                        );
1053                        continue;
1054                    }
1055                }
1056                VectorIndexType::ScaNN => {
1057                    if let (Some(centroids), Some(codebook)) = (
1058                        trained.centroids.get(&field.0),
1059                        trained.codebooks.get(&field.0),
1060                    ) {
1061                        // Build ScaNN (IVF-PQ) index
1062                        let ivf_pq_config = crate::structures::IVFPQConfig::new(dim);
1063                        let ivf_pq_index = crate::structures::IVFPQIndex::build(
1064                            ivf_pq_config,
1065                            centroids,
1066                            codebook,
1067                            &all_vectors,
1068                            Some(&ann_doc_ids),
1069                        );
1070
1071                        let index_data = super::builder::ScaNNIndexData {
1072                            centroids: (**centroids).clone(),
1073                            codebook: (**codebook).clone(),
1074                            index: ivf_pq_index,
1075                        };
1076                        let bytes = index_data
1077                            .to_bytes()
1078                            .map_err(|e| crate::Error::Serialization(e.to_string()))?;
1079                        field_indexes.push((field.0, 2u8, bytes)); // 2 = ScaNN
1080
1081                        log::info!(
1082                            "Built ScaNN index for field {} with {} vectors",
1083                            field.0,
1084                            all_vectors.len()
1085                        );
1086                        continue;
1087                    }
1088                }
1089                _ => {}
1090            }
1091
1092            // Fallback: keep as Flat if no trained structures available
1093            let flat_data = super::builder::FlatVectorData {
1094                dim,
1095                vectors: all_vectors,
1096                doc_ids: all_doc_ids,
1097            };
1098            let bytes = serde_json::to_vec(&flat_data)
1099                .map_err(|e| crate::Error::Serialization(e.to_string()))?;
1100            field_indexes.push((field.0, 3u8, bytes)); // 3 = Flat
1101        }
1102
1103        // Write vectors file
1104        if !field_indexes.is_empty() {
1105            field_indexes.sort_by_key(|(id, _, _)| *id);
1106
1107            let header_size = 4 + field_indexes.len() * (4 + 1 + 8 + 8);
1108            let mut output = Vec::new();
1109
1110            output.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
1111
1112            let mut current_offset = header_size as u64;
1113            for (field_id, index_type, data) in &field_indexes {
1114                output.write_u32::<LittleEndian>(*field_id)?;
1115                output.write_u8(*index_type)?;
1116                output.write_u64::<LittleEndian>(current_offset)?;
1117                output.write_u64::<LittleEndian>(data.len() as u64)?;
1118                current_offset += data.len() as u64;
1119            }
1120
1121            for (_, _, data) in field_indexes {
1122                output.extend_from_slice(&data);
1123            }
1124
1125            let output_size = output.len();
1126            dir.write(&files.vectors, &output).await?;
1127            return Ok(output_size);
1128        }
1129
1130        Ok(0)
1131    }
1132}
1133
1134/// Delete segment files from directory
1135pub async fn delete_segment<D: Directory + DirectoryWriter>(
1136    dir: &D,
1137    segment_id: SegmentId,
1138) -> Result<()> {
1139    let files = SegmentFiles::new(segment_id.0);
1140    let _ = dir.delete(&files.term_dict).await;
1141    let _ = dir.delete(&files.postings).await;
1142    let _ = dir.delete(&files.store).await;
1143    let _ = dir.delete(&files.meta).await;
1144    let _ = dir.delete(&files.vectors).await;
1145    Ok(())
1146}