hermes_core/segment/
merger.rs

1//! Segment merger for combining multiple segments
2
3use std::sync::Arc;
4
5use rustc_hash::FxHashMap;
6
7use super::builder::{SegmentBuilder, SegmentBuilderConfig};
8use super::reader::SegmentReader;
9use super::store::StoreMerger;
10use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
11use crate::Result;
12use crate::directories::{Directory, DirectoryWriter};
13use crate::dsl::{FieldType, Schema};
14use crate::structures::{
15    BlockPostingList, PostingList, RaBitQConfig, RaBitQIndex, SSTableWriter, TERMINATED, TermInfo,
16};
17
18/// Segment merger - merges multiple segments into one
19pub struct SegmentMerger {
20    schema: Arc<Schema>,
21}
22
23impl SegmentMerger {
24    pub fn new(schema: Arc<Schema>) -> Self {
25        Self { schema }
26    }
27
28    /// Merge segments - uses optimized store stacking when possible
29    pub async fn merge<D: Directory + DirectoryWriter>(
30        &self,
31        dir: &D,
32        segments: &[SegmentReader],
33        new_segment_id: SegmentId,
34    ) -> Result<SegmentMeta> {
35        // Check if we can use optimized store stacking (no dictionaries)
36        let can_stack_stores = segments.iter().all(|s| !s.store_has_dict());
37
38        if can_stack_stores {
39            self.merge_optimized(dir, segments, new_segment_id).await
40        } else {
41            self.merge_rebuild(dir, segments, new_segment_id).await
42        }
43    }
44
45    /// Optimized merge: stack compressed store blocks, rebuild only term dict/postings
46    async fn merge_optimized<D: Directory + DirectoryWriter>(
47        &self,
48        dir: &D,
49        segments: &[SegmentReader],
50        new_segment_id: SegmentId,
51    ) -> Result<SegmentMeta> {
52        let files = SegmentFiles::new(new_segment_id.0);
53
54        // Build merged term dictionary and postings (still need to rebuild these)
55        let mut term_dict_data = Vec::new();
56        let mut postings_data = Vec::new();
57        self.merge_postings(segments, &mut term_dict_data, &mut postings_data)
58            .await?;
59
60        // Stack store files without recompression
61        let mut store_data = Vec::new();
62        {
63            let mut store_merger = StoreMerger::new(&mut store_data);
64            for segment in segments {
65                let raw_blocks = segment.store_raw_blocks();
66                let data_slice = segment.store_data_slice();
67                store_merger.append_store(data_slice, &raw_blocks).await?;
68            }
69            store_merger.finish()?;
70        }
71
72        // Write files
73        dir.write(&files.term_dict, &term_dict_data).await?;
74        dir.write(&files.postings, &postings_data).await?;
75        dir.write(&files.store, &store_data).await?;
76
77        // Merge dense vector indexes
78        self.merge_dense_vectors(dir, segments, &files).await?;
79
80        // Merge field stats
81        let mut merged_field_stats: FxHashMap<u32, FieldStats> = FxHashMap::default();
82        for segment in segments {
83            for (&field_id, stats) in &segment.meta().field_stats {
84                let entry = merged_field_stats.entry(field_id).or_default();
85                entry.total_tokens += stats.total_tokens;
86                entry.doc_count += stats.doc_count;
87            }
88        }
89
90        let total_docs: u32 = segments.iter().map(|s| s.num_docs()).sum();
91        let meta = SegmentMeta {
92            id: new_segment_id.0,
93            num_docs: total_docs,
94            field_stats: merged_field_stats,
95        };
96
97        dir.write(&files.meta, &meta.serialize()?).await?;
98
99        Ok(meta)
100    }
101
102    /// Fallback merge: rebuild everything from documents
103    async fn merge_rebuild<D: Directory + DirectoryWriter>(
104        &self,
105        dir: &D,
106        segments: &[SegmentReader],
107        new_segment_id: SegmentId,
108    ) -> Result<SegmentMeta> {
109        let mut builder =
110            SegmentBuilder::new((*self.schema).clone(), SegmentBuilderConfig::default())?;
111
112        for segment in segments {
113            for doc_id in 0..segment.num_docs() {
114                if let Some(doc) = segment.doc(doc_id).await? {
115                    builder.add_document(doc)?;
116                }
117            }
118        }
119
120        builder.build(dir, new_segment_id).await
121    }
122
123    /// Merge postings from multiple segments using hybrid stacking
124    ///
125    /// Optimization: For terms that exist in only one segment, we copy the
126    /// posting data directly without decode/encode. Only terms that exist
127    /// in multiple segments need full merge.
128    async fn merge_postings(
129        &self,
130        segments: &[SegmentReader],
131        term_dict: &mut Vec<u8>,
132        postings_out: &mut Vec<u8>,
133    ) -> Result<()> {
134        use std::collections::BTreeMap;
135
136        // Phase 1: Collect all terms and track which segments contain each term
137        // Key -> Vec<(segment_index, term_info, doc_offset)>
138        let mut term_sources: BTreeMap<Vec<u8>, Vec<(usize, TermInfo, u32)>> = BTreeMap::new();
139        let mut doc_offset = 0u32;
140
141        for (seg_idx, segment) in segments.iter().enumerate() {
142            let terms = segment.all_terms().await?;
143            for (key, term_info) in terms {
144                term_sources
145                    .entry(key)
146                    .or_default()
147                    .push((seg_idx, term_info, doc_offset));
148            }
149            doc_offset += segment.num_docs();
150        }
151
152        // Phase 2: Process terms - collect all results first (async)
153        // This avoids holding SSTableWriter across await points
154        let mut term_results: Vec<(Vec<u8>, TermInfo)> = Vec::with_capacity(term_sources.len());
155
156        for (key, sources) in term_sources {
157            let term_info = if sources.len() == 1 {
158                // Optimization: Term exists in only one segment - copy directly
159                let (seg_idx, source_info, seg_doc_offset) = &sources[0];
160                self.copy_term_posting(
161                    &segments[*seg_idx],
162                    source_info,
163                    *seg_doc_offset,
164                    postings_out,
165                )
166                .await?
167            } else {
168                // Term exists in multiple segments - need full merge
169                self.merge_term_postings(segments, &sources, postings_out)
170                    .await?
171            };
172
173            term_results.push((key, term_info));
174        }
175
176        // Phase 3: Write to SSTable (sync, no await points)
177        let mut writer = SSTableWriter::<TermInfo>::new(term_dict);
178        for (key, term_info) in term_results {
179            writer.insert(&key, &term_info)?;
180        }
181        writer.finish()?;
182
183        Ok(())
184    }
185
186    /// Copy a term's posting data directly from source segment (no decode/encode)
187    /// Only adjusts doc IDs by adding the segment's doc offset
188    async fn copy_term_posting(
189        &self,
190        segment: &SegmentReader,
191        source_info: &TermInfo,
192        doc_offset: u32,
193        postings_out: &mut Vec<u8>,
194    ) -> Result<TermInfo> {
195        // Handle inline postings - need to remap doc IDs
196        if let Some((doc_ids, term_freqs)) = source_info.decode_inline() {
197            let remapped_ids: Vec<u32> = doc_ids.iter().map(|&id| id + doc_offset).collect();
198            if let Some(inline) = TermInfo::try_inline(&remapped_ids, &term_freqs) {
199                return Ok(inline);
200            }
201            // If can't inline after remapping (shouldn't happen), fall through to external
202            let mut pl = PostingList::with_capacity(remapped_ids.len());
203            for (doc_id, tf) in remapped_ids.into_iter().zip(term_freqs.into_iter()) {
204                pl.push(doc_id, tf);
205            }
206            let posting_offset = postings_out.len() as u64;
207            let block_list = BlockPostingList::from_posting_list(&pl)?;
208            let mut encoded = Vec::new();
209            block_list.serialize(&mut encoded)?;
210            postings_out.extend_from_slice(&encoded);
211            return Ok(TermInfo::external(
212                posting_offset,
213                encoded.len() as u32,
214                pl.doc_count(),
215            ));
216        }
217
218        // External posting - read, decode, remap doc IDs, re-encode
219        // Note: We can't just copy bytes because doc IDs are delta-encoded
220        let (offset, len) = source_info.external_info().unwrap();
221        let posting_bytes = segment.read_postings(offset, len).await?;
222        let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
223
224        // Remap doc IDs
225        let mut remapped = PostingList::with_capacity(source_postings.doc_count() as usize);
226        let mut iter = source_postings.iterator();
227        while iter.doc() != TERMINATED {
228            remapped.add(iter.doc() + doc_offset, iter.term_freq());
229            iter.advance();
230        }
231
232        // Try to inline if small enough
233        let doc_ids: Vec<u32> = remapped.iter().map(|p| p.doc_id).collect();
234        let term_freqs: Vec<u32> = remapped.iter().map(|p| p.term_freq).collect();
235
236        if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
237            return Ok(inline);
238        }
239
240        // Write to postings file
241        let posting_offset = postings_out.len() as u64;
242        let block_list = BlockPostingList::from_posting_list(&remapped)?;
243        let mut encoded = Vec::new();
244        block_list.serialize(&mut encoded)?;
245        postings_out.extend_from_slice(&encoded);
246
247        Ok(TermInfo::external(
248            posting_offset,
249            encoded.len() as u32,
250            remapped.doc_count(),
251        ))
252    }
253
254    /// Merge postings for a term that exists in multiple segments
255    /// Uses block-level concatenation for O(num_blocks) instead of O(num_postings)
256    async fn merge_term_postings(
257        &self,
258        segments: &[SegmentReader],
259        sources: &[(usize, TermInfo, u32)],
260        postings_out: &mut Vec<u8>,
261    ) -> Result<TermInfo> {
262        // Sort sources by doc_offset to ensure postings are added in sorted order
263        let mut sorted_sources: Vec<_> = sources.to_vec();
264        sorted_sources.sort_by_key(|(_, _, doc_offset)| *doc_offset);
265
266        // Check if all sources are external (can use block concatenation)
267        let all_external = sorted_sources
268            .iter()
269            .all(|(_, term_info, _)| term_info.external_info().is_some());
270
271        if all_external && sorted_sources.len() > 1 {
272            // Fast path: block-level concatenation
273            let mut block_sources = Vec::with_capacity(sorted_sources.len());
274
275            for (seg_idx, term_info, doc_offset) in &sorted_sources {
276                let segment = &segments[*seg_idx];
277                let (offset, len) = term_info.external_info().unwrap();
278                let posting_bytes = segment.read_postings(offset, len).await?;
279                let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
280                block_sources.push((source_postings, *doc_offset));
281            }
282
283            let merged_blocks = BlockPostingList::concatenate_blocks(&block_sources)?;
284            let posting_offset = postings_out.len() as u64;
285            let mut encoded = Vec::new();
286            merged_blocks.serialize(&mut encoded)?;
287            postings_out.extend_from_slice(&encoded);
288
289            return Ok(TermInfo::external(
290                posting_offset,
291                encoded.len() as u32,
292                merged_blocks.doc_count(),
293            ));
294        }
295
296        // Slow path: full decode/encode for inline postings or single source
297        let mut merged = PostingList::new();
298
299        for (seg_idx, term_info, doc_offset) in &sorted_sources {
300            let segment = &segments[*seg_idx];
301
302            if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
303                // Inline posting list
304                for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
305                    merged.add(doc_id + doc_offset, tf);
306                }
307            } else {
308                // External posting list
309                let (offset, len) = term_info.external_info().unwrap();
310                let posting_bytes = segment.read_postings(offset, len).await?;
311                let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
312
313                let mut iter = source_postings.iterator();
314                while iter.doc() != TERMINATED {
315                    merged.add(iter.doc() + doc_offset, iter.term_freq());
316                    iter.advance();
317                }
318            }
319        }
320
321        // Try to inline small posting lists
322        let doc_ids: Vec<u32> = merged.iter().map(|p| p.doc_id).collect();
323        let term_freqs: Vec<u32> = merged.iter().map(|p| p.term_freq).collect();
324
325        if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
326            return Ok(inline);
327        }
328
329        // Write to postings file
330        let posting_offset = postings_out.len() as u64;
331        let block_list = BlockPostingList::from_posting_list(&merged)?;
332        let mut encoded = Vec::new();
333        block_list.serialize(&mut encoded)?;
334        postings_out.extend_from_slice(&encoded);
335
336        Ok(TermInfo::external(
337            posting_offset,
338            encoded.len() as u32,
339            merged.doc_count(),
340        ))
341    }
342    /// Merge dense vector indexes from multiple segments into unified file
343    ///
344    /// For IVF-RaBitQ: O(1) merge by concatenating cluster data (same centroids)
345    /// For RaBitQ: Must rebuild with new centroid from all vectors
346    async fn merge_dense_vectors<D: Directory + DirectoryWriter>(
347        &self,
348        dir: &D,
349        segments: &[SegmentReader],
350        files: &SegmentFiles,
351    ) -> Result<()> {
352        use byteorder::{LittleEndian, WriteBytesExt};
353
354        let mut field_indexes: Vec<(u32, Vec<u8>)> = Vec::new();
355
356        for (field, entry) in self.schema.fields() {
357            if !matches!(entry.field_type, FieldType::DenseVector) {
358                continue;
359            }
360
361            // Check if all segments have IVF indexes for this field
362            let ivf_indexes: Vec<_> = segments
363                .iter()
364                .filter_map(|s| s.get_ivf_vector_index(field))
365                .collect();
366
367            if ivf_indexes.len()
368                == segments
369                    .iter()
370                    .filter(|s| s.has_dense_vector_index(field))
371                    .count()
372                && !ivf_indexes.is_empty()
373            {
374                // All segments have IVF - use O(1) cluster merge!
375                let refs: Vec<&crate::structures::IVFRaBitQIndex> =
376                    ivf_indexes.iter().map(|arc| arc.as_ref()).collect();
377
378                // Calculate doc_id offsets
379                let mut doc_offsets = Vec::with_capacity(segments.len());
380                let mut offset = 0u32;
381                for segment in segments {
382                    doc_offsets.push(offset);
383                    offset += segment.num_docs();
384                }
385
386                match crate::structures::IVFRaBitQIndex::merge(&refs, &doc_offsets) {
387                    Ok(merged) => {
388                        let bytes = serde_json::to_vec(&merged)
389                            .map_err(|e| crate::Error::Serialization(e.to_string()))?;
390                        field_indexes.push((field.0, bytes));
391                        continue;
392                    }
393                    Err(e) => {
394                        log::warn!("IVF merge failed: {}, falling back to rebuild", e);
395                    }
396                }
397            }
398
399            // Fall back to RaBitQ rebuild (collect raw vectors)
400            let mut all_vectors: Vec<Vec<f32>> = Vec::new();
401
402            for segment in segments {
403                if let Some(index) = segment.get_dense_vector_index(field)
404                    && let Some(raw_vecs) = &index.raw_vectors
405                {
406                    all_vectors.extend(raw_vecs.iter().cloned());
407                }
408            }
409
410            if !all_vectors.is_empty() {
411                let dim = all_vectors[0].len();
412                let config = RaBitQConfig::new(dim);
413                let merged_index = RaBitQIndex::build(config, &all_vectors, true);
414
415                let index_bytes = serde_json::to_vec(&merged_index)
416                    .map_err(|e| crate::Error::Serialization(e.to_string()))?;
417
418                field_indexes.push((field.0, index_bytes));
419            }
420        }
421
422        // Write unified vectors file
423        if !field_indexes.is_empty() {
424            field_indexes.sort_by_key(|(id, _)| *id);
425
426            let header_size = 4 + field_indexes.len() * (4 + 8 + 8);
427            let mut output = Vec::new();
428
429            output.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
430
431            let mut current_offset = header_size as u64;
432            for (field_id, data) in &field_indexes {
433                output.write_u32::<LittleEndian>(*field_id)?;
434                output.write_u64::<LittleEndian>(current_offset)?;
435                output.write_u64::<LittleEndian>(data.len() as u64)?;
436                current_offset += data.len() as u64;
437            }
438
439            for (_, data) in field_indexes {
440                output.extend_from_slice(&data);
441            }
442
443            dir.write(&files.vectors, &output).await?;
444        }
445
446        Ok(())
447    }
448}
449
450/// Delete segment files from directory
451pub async fn delete_segment<D: Directory + DirectoryWriter>(
452    dir: &D,
453    segment_id: SegmentId,
454) -> Result<()> {
455    let files = SegmentFiles::new(segment_id.0);
456    let _ = dir.delete(&files.term_dict).await;
457    let _ = dir.delete(&files.postings).await;
458    let _ = dir.delete(&files.store).await;
459    let _ = dir.delete(&files.meta).await;
460    let _ = dir.delete(&files.vectors).await;
461    Ok(())
462}