hermes_core/segment/
merger.rs

1//! Segment merger for combining multiple segments
2
3use std::sync::Arc;
4
5use rustc_hash::FxHashMap;
6
7use super::builder::{SegmentBuilder, SegmentBuilderConfig};
8use super::reader::SegmentReader;
9use super::store::StoreMerger;
10use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
11use crate::Result;
12use crate::directories::{Directory, DirectoryWriter};
13use crate::dsl::{FieldType, Schema};
14use crate::structures::{
15    BlockPostingList, PostingList, RaBitQConfig, RaBitQIndex, SSTableWriter, TERMINATED, TermInfo,
16};
17
18/// Segment merger - merges multiple segments into one
19pub struct SegmentMerger {
20    schema: Arc<Schema>,
21}
22
23impl SegmentMerger {
24    pub fn new(schema: Arc<Schema>) -> Self {
25        Self { schema }
26    }
27
28    /// Merge segments - uses optimized store stacking when possible
29    pub async fn merge<D: Directory + DirectoryWriter>(
30        &self,
31        dir: &D,
32        segments: &[SegmentReader],
33        new_segment_id: SegmentId,
34    ) -> Result<SegmentMeta> {
35        // Check if we can use optimized store stacking (no dictionaries)
36        let can_stack_stores = segments.iter().all(|s| !s.store_has_dict());
37
38        if can_stack_stores {
39            self.merge_optimized(dir, segments, new_segment_id).await
40        } else {
41            self.merge_rebuild(dir, segments, new_segment_id).await
42        }
43    }
44
45    /// Optimized merge: stack compressed store blocks, rebuild only term dict/postings
46    async fn merge_optimized<D: Directory + DirectoryWriter>(
47        &self,
48        dir: &D,
49        segments: &[SegmentReader],
50        new_segment_id: SegmentId,
51    ) -> Result<SegmentMeta> {
52        let files = SegmentFiles::new(new_segment_id.0);
53
54        // Build merged term dictionary and postings (still need to rebuild these)
55        let mut term_dict_data = Vec::new();
56        let mut postings_data = Vec::new();
57        self.merge_postings(segments, &mut term_dict_data, &mut postings_data)
58            .await?;
59
60        // Stack store files without recompression
61        let mut store_data = Vec::new();
62        {
63            let mut store_merger = StoreMerger::new(&mut store_data);
64            for segment in segments {
65                let raw_blocks = segment.store_raw_blocks();
66                let data_slice = segment.store_data_slice();
67                store_merger.append_store(data_slice, &raw_blocks).await?;
68            }
69            store_merger.finish()?;
70        }
71
72        // Write files
73        dir.write(&files.term_dict, &term_dict_data).await?;
74        dir.write(&files.postings, &postings_data).await?;
75        dir.write(&files.store, &store_data).await?;
76
77        // Merge dense vector indexes
78        self.merge_dense_vectors(dir, segments, &files).await?;
79
80        // Merge field stats
81        let mut merged_field_stats: FxHashMap<u32, FieldStats> = FxHashMap::default();
82        for segment in segments {
83            for (&field_id, stats) in &segment.meta().field_stats {
84                let entry = merged_field_stats.entry(field_id).or_default();
85                entry.total_tokens += stats.total_tokens;
86                entry.doc_count += stats.doc_count;
87            }
88        }
89
90        let total_docs: u32 = segments.iter().map(|s| s.num_docs()).sum();
91        let meta = SegmentMeta {
92            id: new_segment_id.0,
93            num_docs: total_docs,
94            field_stats: merged_field_stats,
95        };
96
97        dir.write(&files.meta, &meta.serialize()?).await?;
98
99        Ok(meta)
100    }
101
102    /// Fallback merge: rebuild everything from documents
103    async fn merge_rebuild<D: Directory + DirectoryWriter>(
104        &self,
105        dir: &D,
106        segments: &[SegmentReader],
107        new_segment_id: SegmentId,
108    ) -> Result<SegmentMeta> {
109        let mut builder =
110            SegmentBuilder::new((*self.schema).clone(), SegmentBuilderConfig::default())?;
111
112        for segment in segments {
113            for doc_id in 0..segment.num_docs() {
114                if let Some(doc) = segment.doc(doc_id).await? {
115                    builder.add_document(doc)?;
116                }
117            }
118        }
119
120        builder.build(dir, new_segment_id).await
121    }
122
123    /// Merge postings from multiple segments using hybrid stacking
124    ///
125    /// Optimization: For terms that exist in only one segment, we copy the
126    /// posting data directly without decode/encode. Only terms that exist
127    /// in multiple segments need full merge.
128    async fn merge_postings(
129        &self,
130        segments: &[SegmentReader],
131        term_dict: &mut Vec<u8>,
132        postings_out: &mut Vec<u8>,
133    ) -> Result<()> {
134        use std::collections::BTreeMap;
135
136        // Phase 1: Collect all terms and track which segments contain each term
137        // Key -> Vec<(segment_index, term_info, doc_offset)>
138        let mut term_sources: BTreeMap<Vec<u8>, Vec<(usize, TermInfo, u32)>> = BTreeMap::new();
139        let mut doc_offset = 0u32;
140
141        for (seg_idx, segment) in segments.iter().enumerate() {
142            let terms = segment.all_terms().await?;
143            for (key, term_info) in terms {
144                term_sources
145                    .entry(key)
146                    .or_default()
147                    .push((seg_idx, term_info, doc_offset));
148            }
149            doc_offset += segment.num_docs();
150        }
151
152        // Phase 2: Process terms - collect all results first (async)
153        // This avoids holding SSTableWriter across await points
154        let mut term_results: Vec<(Vec<u8>, TermInfo)> = Vec::with_capacity(term_sources.len());
155
156        for (key, sources) in term_sources {
157            let term_info = if sources.len() == 1 {
158                // Optimization: Term exists in only one segment - copy directly
159                let (seg_idx, source_info, seg_doc_offset) = &sources[0];
160                self.copy_term_posting(
161                    &segments[*seg_idx],
162                    source_info,
163                    *seg_doc_offset,
164                    postings_out,
165                )
166                .await?
167            } else {
168                // Term exists in multiple segments - need full merge
169                self.merge_term_postings(segments, &sources, postings_out)
170                    .await?
171            };
172
173            term_results.push((key, term_info));
174        }
175
176        // Phase 3: Write to SSTable (sync, no await points)
177        let mut writer = SSTableWriter::<TermInfo>::new(term_dict);
178        for (key, term_info) in term_results {
179            writer.insert(&key, &term_info)?;
180        }
181        writer.finish()?;
182
183        Ok(())
184    }
185
186    /// Copy a term's posting data directly from source segment (no decode/encode)
187    /// Only adjusts doc IDs by adding the segment's doc offset
188    async fn copy_term_posting(
189        &self,
190        segment: &SegmentReader,
191        source_info: &TermInfo,
192        doc_offset: u32,
193        postings_out: &mut Vec<u8>,
194    ) -> Result<TermInfo> {
195        // Handle inline postings - need to remap doc IDs
196        if let Some((doc_ids, term_freqs)) = source_info.decode_inline() {
197            let remapped_ids: Vec<u32> = doc_ids.iter().map(|&id| id + doc_offset).collect();
198            if let Some(inline) = TermInfo::try_inline(&remapped_ids, &term_freqs) {
199                return Ok(inline);
200            }
201            // If can't inline after remapping (shouldn't happen), fall through to external
202            let mut pl = PostingList::with_capacity(remapped_ids.len());
203            for (doc_id, tf) in remapped_ids.into_iter().zip(term_freqs.into_iter()) {
204                pl.push(doc_id, tf);
205            }
206            let posting_offset = postings_out.len() as u64;
207            let block_list = BlockPostingList::from_posting_list(&pl)?;
208            let mut encoded = Vec::new();
209            block_list.serialize(&mut encoded)?;
210            postings_out.extend_from_slice(&encoded);
211            return Ok(TermInfo::external(
212                posting_offset,
213                encoded.len() as u32,
214                pl.doc_count(),
215            ));
216        }
217
218        // External posting - read, decode, remap doc IDs, re-encode
219        // Note: We can't just copy bytes because doc IDs are delta-encoded
220        let (offset, len) = source_info.external_info().unwrap();
221        let posting_bytes = segment.read_postings(offset, len).await?;
222        let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
223
224        // Remap doc IDs
225        let mut remapped = PostingList::with_capacity(source_postings.doc_count() as usize);
226        let mut iter = source_postings.iterator();
227        while iter.doc() != TERMINATED {
228            remapped.add(iter.doc() + doc_offset, iter.term_freq());
229            iter.advance();
230        }
231
232        // Try to inline if small enough
233        let doc_ids: Vec<u32> = remapped.iter().map(|p| p.doc_id).collect();
234        let term_freqs: Vec<u32> = remapped.iter().map(|p| p.term_freq).collect();
235
236        if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
237            return Ok(inline);
238        }
239
240        // Write to postings file
241        let posting_offset = postings_out.len() as u64;
242        let block_list = BlockPostingList::from_posting_list(&remapped)?;
243        let mut encoded = Vec::new();
244        block_list.serialize(&mut encoded)?;
245        postings_out.extend_from_slice(&encoded);
246
247        Ok(TermInfo::external(
248            posting_offset,
249            encoded.len() as u32,
250            remapped.doc_count(),
251        ))
252    }
253
254    /// Merge postings for a term that exists in multiple segments
255    /// Uses block-level concatenation for O(num_blocks) instead of O(num_postings)
256    async fn merge_term_postings(
257        &self,
258        segments: &[SegmentReader],
259        sources: &[(usize, TermInfo, u32)],
260        postings_out: &mut Vec<u8>,
261    ) -> Result<TermInfo> {
262        // Sort sources by doc_offset to ensure postings are added in sorted order
263        let mut sorted_sources: Vec<_> = sources.to_vec();
264        sorted_sources.sort_by_key(|(_, _, doc_offset)| *doc_offset);
265
266        // Check if all sources are external (can use block concatenation)
267        let all_external = sorted_sources
268            .iter()
269            .all(|(_, term_info, _)| term_info.external_info().is_some());
270
271        if all_external && sorted_sources.len() > 1 {
272            // Fast path: block-level concatenation
273            let mut block_sources = Vec::with_capacity(sorted_sources.len());
274
275            for (seg_idx, term_info, doc_offset) in &sorted_sources {
276                let segment = &segments[*seg_idx];
277                let (offset, len) = term_info.external_info().unwrap();
278                let posting_bytes = segment.read_postings(offset, len).await?;
279                let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
280                block_sources.push((source_postings, *doc_offset));
281            }
282
283            let merged_blocks = BlockPostingList::concatenate_blocks(&block_sources)?;
284            let posting_offset = postings_out.len() as u64;
285            let mut encoded = Vec::new();
286            merged_blocks.serialize(&mut encoded)?;
287            postings_out.extend_from_slice(&encoded);
288
289            return Ok(TermInfo::external(
290                posting_offset,
291                encoded.len() as u32,
292                merged_blocks.doc_count(),
293            ));
294        }
295
296        // Slow path: full decode/encode for inline postings or single source
297        let mut merged = PostingList::new();
298
299        for (seg_idx, term_info, doc_offset) in &sorted_sources {
300            let segment = &segments[*seg_idx];
301
302            if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
303                // Inline posting list
304                for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
305                    merged.add(doc_id + doc_offset, tf);
306                }
307            } else {
308                // External posting list
309                let (offset, len) = term_info.external_info().unwrap();
310                let posting_bytes = segment.read_postings(offset, len).await?;
311                let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
312
313                let mut iter = source_postings.iterator();
314                while iter.doc() != TERMINATED {
315                    merged.add(iter.doc() + doc_offset, iter.term_freq());
316                    iter.advance();
317                }
318            }
319        }
320
321        // Try to inline small posting lists
322        let doc_ids: Vec<u32> = merged.iter().map(|p| p.doc_id).collect();
323        let term_freqs: Vec<u32> = merged.iter().map(|p| p.term_freq).collect();
324
325        if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
326            return Ok(inline);
327        }
328
329        // Write to postings file
330        let posting_offset = postings_out.len() as u64;
331        let block_list = BlockPostingList::from_posting_list(&merged)?;
332        let mut encoded = Vec::new();
333        block_list.serialize(&mut encoded)?;
334        postings_out.extend_from_slice(&encoded);
335
336        Ok(TermInfo::external(
337            posting_offset,
338            encoded.len() as u32,
339            merged.doc_count(),
340        ))
341    }
342    /// Merge dense vector indexes from multiple segments into unified file
343    ///
344    /// For ScaNN (IVF-PQ): O(1) merge by concatenating cluster data (same codebook)
345    /// For IVF-RaBitQ: O(1) merge by concatenating cluster data (same centroids)
346    /// For RaBitQ: Must rebuild with new centroid from all vectors
347    async fn merge_dense_vectors<D: Directory + DirectoryWriter>(
348        &self,
349        dir: &D,
350        segments: &[SegmentReader],
351        files: &SegmentFiles,
352    ) -> Result<()> {
353        use byteorder::{LittleEndian, WriteBytesExt};
354
355        // (field_id, index_type, data)
356        let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
357
358        for (field, entry) in self.schema.fields() {
359            if !matches!(entry.field_type, FieldType::DenseVector) {
360                continue;
361            }
362
363            // Check if all segments have ScaNN indexes for this field
364            let scann_indexes: Vec<_> = segments
365                .iter()
366                .filter_map(|s| s.get_scann_vector_index(field))
367                .collect();
368
369            if scann_indexes.len()
370                == segments
371                    .iter()
372                    .filter(|s| s.has_dense_vector_index(field))
373                    .count()
374                && !scann_indexes.is_empty()
375            {
376                // All segments have ScaNN - use O(1) cluster merge!
377                let refs: Vec<&crate::structures::IVFPQIndex> =
378                    scann_indexes.iter().map(|(idx, _)| idx.as_ref()).collect();
379
380                // Calculate doc_id offsets
381                let mut doc_offsets = Vec::with_capacity(segments.len());
382                let mut offset = 0u32;
383                for segment in segments {
384                    doc_offsets.push(offset);
385                    offset += segment.num_docs();
386                }
387
388                match crate::structures::IVFPQIndex::merge(&refs, &doc_offsets) {
389                    Ok(merged) => {
390                        let bytes = merged
391                            .to_bytes()
392                            .map_err(|e| crate::Error::Serialization(e.to_string()))?;
393                        field_indexes.push((field.0, 2u8, bytes)); // 2 = ScaNN
394                        continue;
395                    }
396                    Err(e) => {
397                        log::warn!("ScaNN merge failed: {}, falling back to IVF", e);
398                    }
399                }
400            }
401
402            // Check if all segments have IVF indexes for this field
403            let ivf_indexes: Vec<_> = segments
404                .iter()
405                .filter_map(|s| s.get_ivf_vector_index(field))
406                .collect();
407
408            if ivf_indexes.len()
409                == segments
410                    .iter()
411                    .filter(|s| s.has_dense_vector_index(field))
412                    .count()
413                && !ivf_indexes.is_empty()
414            {
415                // All segments have IVF - use O(1) cluster merge!
416                let refs: Vec<&crate::structures::IVFRaBitQIndex> =
417                    ivf_indexes.iter().map(|arc| arc.as_ref()).collect();
418
419                // Calculate doc_id offsets
420                let mut doc_offsets = Vec::with_capacity(segments.len());
421                let mut offset = 0u32;
422                for segment in segments {
423                    doc_offsets.push(offset);
424                    offset += segment.num_docs();
425                }
426
427                match crate::structures::IVFRaBitQIndex::merge(&refs, &doc_offsets) {
428                    Ok(merged) => {
429                        let bytes = merged
430                            .to_bytes()
431                            .map_err(|e| crate::Error::Serialization(e.to_string()))?;
432                        field_indexes.push((field.0, 1u8, bytes)); // 1 = IVF-RaBitQ
433                        continue;
434                    }
435                    Err(e) => {
436                        log::warn!("IVF merge failed: {}, falling back to rebuild", e);
437                    }
438                }
439            }
440
441            // Fall back to RaBitQ rebuild (collect raw vectors)
442            let mut all_vectors: Vec<Vec<f32>> = Vec::new();
443
444            for segment in segments {
445                if let Some(index) = segment.get_dense_vector_index(field)
446                    && let Some(raw_vecs) = &index.raw_vectors
447                {
448                    all_vectors.extend(raw_vecs.iter().cloned());
449                }
450            }
451
452            if !all_vectors.is_empty() {
453                let dim = all_vectors[0].len();
454                let config = RaBitQConfig::new(dim);
455                let merged_index = RaBitQIndex::build(config, &all_vectors, true);
456
457                let index_bytes = serde_json::to_vec(&merged_index)
458                    .map_err(|e| crate::Error::Serialization(e.to_string()))?;
459
460                field_indexes.push((field.0, 0u8, index_bytes)); // 0 = RaBitQ
461            }
462        }
463
464        // Write unified vectors file with index_type
465        if !field_indexes.is_empty() {
466            field_indexes.sort_by_key(|(id, _, _)| *id);
467
468            // Header: num_fields + (field_id, index_type, offset, len) per field
469            let header_size = 4 + field_indexes.len() * (4 + 1 + 8 + 8);
470            let mut output = Vec::new();
471
472            output.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
473
474            let mut current_offset = header_size as u64;
475            for (field_id, index_type, data) in &field_indexes {
476                output.write_u32::<LittleEndian>(*field_id)?;
477                output.write_u8(*index_type)?;
478                output.write_u64::<LittleEndian>(current_offset)?;
479                output.write_u64::<LittleEndian>(data.len() as u64)?;
480                current_offset += data.len() as u64;
481            }
482
483            for (_, _, data) in field_indexes {
484                output.extend_from_slice(&data);
485            }
486
487            dir.write(&files.vectors, &output).await?;
488        }
489
490        Ok(())
491    }
492}
493
494/// Delete segment files from directory
495pub async fn delete_segment<D: Directory + DirectoryWriter>(
496    dir: &D,
497    segment_id: SegmentId,
498) -> Result<()> {
499    let files = SegmentFiles::new(segment_id.0);
500    let _ = dir.delete(&files.term_dict).await;
501    let _ = dir.delete(&files.postings).await;
502    let _ = dir.delete(&files.store).await;
503    let _ = dir.delete(&files.meta).await;
504    let _ = dir.delete(&files.vectors).await;
505    Ok(())
506}