hermes_core/segment/
merger.rs

1//! Segment merger for combining multiple segments
2
3use std::sync::Arc;
4
5use rustc_hash::FxHashMap;
6
7use super::builder::{SegmentBuilder, SegmentBuilderConfig};
8use super::reader::SegmentReader;
9use super::store::StoreMerger;
10use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
11use crate::Result;
12use crate::directories::{Directory, DirectoryWriter};
13use crate::dsl::Schema;
14use crate::structures::{BlockPostingList, PostingList, SSTableWriter, TERMINATED, TermInfo};
15
16/// Segment merger - merges multiple segments into one
17pub struct SegmentMerger {
18    schema: Arc<Schema>,
19}
20
21impl SegmentMerger {
22    pub fn new(schema: Arc<Schema>) -> Self {
23        Self { schema }
24    }
25
26    /// Merge segments - uses optimized store stacking when possible
27    pub async fn merge<D: Directory + DirectoryWriter>(
28        &self,
29        dir: &D,
30        segments: &[SegmentReader],
31        new_segment_id: SegmentId,
32    ) -> Result<SegmentMeta> {
33        // Check if we can use optimized store stacking (no dictionaries)
34        let can_stack_stores = segments.iter().all(|s| !s.store_has_dict());
35
36        if can_stack_stores {
37            self.merge_optimized(dir, segments, new_segment_id).await
38        } else {
39            self.merge_rebuild(dir, segments, new_segment_id).await
40        }
41    }
42
43    /// Optimized merge: stack compressed store blocks, rebuild only term dict/postings
44    async fn merge_optimized<D: Directory + DirectoryWriter>(
45        &self,
46        dir: &D,
47        segments: &[SegmentReader],
48        new_segment_id: SegmentId,
49    ) -> Result<SegmentMeta> {
50        let files = SegmentFiles::new(new_segment_id.0);
51
52        // Build merged term dictionary and postings (still need to rebuild these)
53        let mut term_dict_data = Vec::new();
54        let mut postings_data = Vec::new();
55        self.merge_postings(segments, &mut term_dict_data, &mut postings_data)
56            .await?;
57
58        // Stack store files without recompression
59        let mut store_data = Vec::new();
60        {
61            let mut store_merger = StoreMerger::new(&mut store_data);
62            for segment in segments {
63                let raw_blocks = segment.store_raw_blocks();
64                let data_slice = segment.store_data_slice();
65                store_merger.append_store(data_slice, &raw_blocks).await?;
66            }
67            store_merger.finish()?;
68        }
69
70        // Write files
71        dir.write(&files.term_dict, &term_dict_data).await?;
72        dir.write(&files.postings, &postings_data).await?;
73        dir.write(&files.store, &store_data).await?;
74
75        // Merge field stats
76        let mut merged_field_stats: FxHashMap<u32, FieldStats> = FxHashMap::default();
77        for segment in segments {
78            for (&field_id, stats) in &segment.meta().field_stats {
79                let entry = merged_field_stats.entry(field_id).or_default();
80                entry.total_tokens += stats.total_tokens;
81                entry.doc_count += stats.doc_count;
82            }
83        }
84
85        let total_docs: u32 = segments.iter().map(|s| s.num_docs()).sum();
86        let meta = SegmentMeta {
87            id: new_segment_id.0,
88            num_docs: total_docs,
89            field_stats: merged_field_stats,
90        };
91
92        dir.write(&files.meta, &meta.serialize()?).await?;
93
94        Ok(meta)
95    }
96
97    /// Fallback merge: rebuild everything from documents
98    async fn merge_rebuild<D: Directory + DirectoryWriter>(
99        &self,
100        dir: &D,
101        segments: &[SegmentReader],
102        new_segment_id: SegmentId,
103    ) -> Result<SegmentMeta> {
104        let mut builder =
105            SegmentBuilder::new((*self.schema).clone(), SegmentBuilderConfig::default())?;
106
107        for segment in segments {
108            for doc_id in 0..segment.num_docs() {
109                if let Some(doc) = segment.doc(doc_id).await? {
110                    builder.add_document(doc)?;
111                }
112            }
113        }
114
115        builder.build(dir, new_segment_id).await
116    }
117
118    /// Merge postings from multiple segments using hybrid stacking
119    ///
120    /// Optimization: For terms that exist in only one segment, we copy the
121    /// posting data directly without decode/encode. Only terms that exist
122    /// in multiple segments need full merge.
123    async fn merge_postings(
124        &self,
125        segments: &[SegmentReader],
126        term_dict: &mut Vec<u8>,
127        postings_out: &mut Vec<u8>,
128    ) -> Result<()> {
129        use std::collections::BTreeMap;
130
131        // Phase 1: Collect all terms and track which segments contain each term
132        // Key -> Vec<(segment_index, term_info, doc_offset)>
133        let mut term_sources: BTreeMap<Vec<u8>, Vec<(usize, TermInfo, u32)>> = BTreeMap::new();
134        let mut doc_offset = 0u32;
135
136        for (seg_idx, segment) in segments.iter().enumerate() {
137            let terms = segment.all_terms().await?;
138            for (key, term_info) in terms {
139                term_sources
140                    .entry(key)
141                    .or_default()
142                    .push((seg_idx, term_info, doc_offset));
143            }
144            doc_offset += segment.num_docs();
145        }
146
147        // Phase 2: Process terms - collect all results first (async)
148        // This avoids holding SSTableWriter across await points
149        let mut term_results: Vec<(Vec<u8>, TermInfo)> = Vec::with_capacity(term_sources.len());
150
151        for (key, sources) in term_sources {
152            let term_info = if sources.len() == 1 {
153                // Optimization: Term exists in only one segment - copy directly
154                let (seg_idx, source_info, seg_doc_offset) = &sources[0];
155                self.copy_term_posting(
156                    &segments[*seg_idx],
157                    source_info,
158                    *seg_doc_offset,
159                    postings_out,
160                )
161                .await?
162            } else {
163                // Term exists in multiple segments - need full merge
164                self.merge_term_postings(segments, &sources, postings_out)
165                    .await?
166            };
167
168            term_results.push((key, term_info));
169        }
170
171        // Phase 3: Write to SSTable (sync, no await points)
172        let mut writer = SSTableWriter::<TermInfo>::new(term_dict);
173        for (key, term_info) in term_results {
174            writer.insert(&key, &term_info)?;
175        }
176        writer.finish()?;
177
178        Ok(())
179    }
180
181    /// Copy a term's posting data directly from source segment (no decode/encode)
182    /// Only adjusts doc IDs by adding the segment's doc offset
183    async fn copy_term_posting(
184        &self,
185        segment: &SegmentReader,
186        source_info: &TermInfo,
187        doc_offset: u32,
188        postings_out: &mut Vec<u8>,
189    ) -> Result<TermInfo> {
190        // Handle inline postings - need to remap doc IDs
191        if let Some((doc_ids, term_freqs)) = source_info.decode_inline() {
192            let remapped_ids: Vec<u32> = doc_ids.iter().map(|&id| id + doc_offset).collect();
193            if let Some(inline) = TermInfo::try_inline(&remapped_ids, &term_freqs) {
194                return Ok(inline);
195            }
196            // If can't inline after remapping (shouldn't happen), fall through to external
197            let mut pl = PostingList::with_capacity(remapped_ids.len());
198            for (doc_id, tf) in remapped_ids.into_iter().zip(term_freqs.into_iter()) {
199                pl.push(doc_id, tf);
200            }
201            let posting_offset = postings_out.len() as u64;
202            let block_list = BlockPostingList::from_posting_list(&pl)?;
203            let mut encoded = Vec::new();
204            block_list.serialize(&mut encoded)?;
205            postings_out.extend_from_slice(&encoded);
206            return Ok(TermInfo::external(
207                posting_offset,
208                encoded.len() as u32,
209                pl.doc_count(),
210            ));
211        }
212
213        // External posting - read, decode, remap doc IDs, re-encode
214        // Note: We can't just copy bytes because doc IDs are delta-encoded
215        let (offset, len) = source_info.external_info().unwrap();
216        let posting_bytes = segment.read_postings(offset, len).await?;
217        let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
218
219        // Remap doc IDs
220        let mut remapped = PostingList::with_capacity(source_postings.doc_count() as usize);
221        let mut iter = source_postings.iterator();
222        while iter.doc() != TERMINATED {
223            remapped.add(iter.doc() + doc_offset, iter.term_freq());
224            iter.advance();
225        }
226
227        // Try to inline if small enough
228        let doc_ids: Vec<u32> = remapped.iter().map(|p| p.doc_id).collect();
229        let term_freqs: Vec<u32> = remapped.iter().map(|p| p.term_freq).collect();
230
231        if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
232            return Ok(inline);
233        }
234
235        // Write to postings file
236        let posting_offset = postings_out.len() as u64;
237        let block_list = BlockPostingList::from_posting_list(&remapped)?;
238        let mut encoded = Vec::new();
239        block_list.serialize(&mut encoded)?;
240        postings_out.extend_from_slice(&encoded);
241
242        Ok(TermInfo::external(
243            posting_offset,
244            encoded.len() as u32,
245            remapped.doc_count(),
246        ))
247    }
248
249    /// Merge postings for a term that exists in multiple segments
250    /// Uses block-level concatenation for O(num_blocks) instead of O(num_postings)
251    async fn merge_term_postings(
252        &self,
253        segments: &[SegmentReader],
254        sources: &[(usize, TermInfo, u32)],
255        postings_out: &mut Vec<u8>,
256    ) -> Result<TermInfo> {
257        // Sort sources by doc_offset to ensure postings are added in sorted order
258        let mut sorted_sources: Vec<_> = sources.to_vec();
259        sorted_sources.sort_by_key(|(_, _, doc_offset)| *doc_offset);
260
261        // Check if all sources are external (can use block concatenation)
262        let all_external = sorted_sources
263            .iter()
264            .all(|(_, term_info, _)| term_info.external_info().is_some());
265
266        if all_external && sorted_sources.len() > 1 {
267            // Fast path: block-level concatenation
268            let mut block_sources = Vec::with_capacity(sorted_sources.len());
269
270            for (seg_idx, term_info, doc_offset) in &sorted_sources {
271                let segment = &segments[*seg_idx];
272                let (offset, len) = term_info.external_info().unwrap();
273                let posting_bytes = segment.read_postings(offset, len).await?;
274                let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
275                block_sources.push((source_postings, *doc_offset));
276            }
277
278            let merged_blocks = BlockPostingList::concatenate_blocks(&block_sources)?;
279            let posting_offset = postings_out.len() as u64;
280            let mut encoded = Vec::new();
281            merged_blocks.serialize(&mut encoded)?;
282            postings_out.extend_from_slice(&encoded);
283
284            return Ok(TermInfo::external(
285                posting_offset,
286                encoded.len() as u32,
287                merged_blocks.doc_count(),
288            ));
289        }
290
291        // Slow path: full decode/encode for inline postings or single source
292        let mut merged = PostingList::new();
293
294        for (seg_idx, term_info, doc_offset) in &sorted_sources {
295            let segment = &segments[*seg_idx];
296
297            if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
298                // Inline posting list
299                for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
300                    merged.add(doc_id + doc_offset, tf);
301                }
302            } else {
303                // External posting list
304                let (offset, len) = term_info.external_info().unwrap();
305                let posting_bytes = segment.read_postings(offset, len).await?;
306                let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
307
308                let mut iter = source_postings.iterator();
309                while iter.doc() != TERMINATED {
310                    merged.add(iter.doc() + doc_offset, iter.term_freq());
311                    iter.advance();
312                }
313            }
314        }
315
316        // Try to inline small posting lists
317        let doc_ids: Vec<u32> = merged.iter().map(|p| p.doc_id).collect();
318        let term_freqs: Vec<u32> = merged.iter().map(|p| p.term_freq).collect();
319
320        if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
321            return Ok(inline);
322        }
323
324        // Write to postings file
325        let posting_offset = postings_out.len() as u64;
326        let block_list = BlockPostingList::from_posting_list(&merged)?;
327        let mut encoded = Vec::new();
328        block_list.serialize(&mut encoded)?;
329        postings_out.extend_from_slice(&encoded);
330
331        Ok(TermInfo::external(
332            posting_offset,
333            encoded.len() as u32,
334            merged.doc_count(),
335        ))
336    }
337}
338
339/// Delete segment files from directory
340pub async fn delete_segment<D: Directory + DirectoryWriter>(
341    dir: &D,
342    segment_id: SegmentId,
343) -> Result<()> {
344    let files = SegmentFiles::new(segment_id.0);
345    let _ = dir.delete(&files.term_dict).await;
346    let _ = dir.delete(&files.postings).await;
347    let _ = dir.delete(&files.store).await;
348    let _ = dir.delete(&files.meta).await;
349    Ok(())
350}