hermes_core/segment/
merger.rs

1//! Segment merger for combining multiple segments
2
3use std::sync::Arc;
4
5use rustc_hash::FxHashMap;
6
7use super::builder::{SegmentBuilder, SegmentBuilderConfig};
8use super::reader::SegmentReader;
9use super::store::StoreMerger;
10use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
11use crate::Result;
12use crate::directories::{Directory, DirectoryWriter};
13use crate::dsl::Schema;
14use crate::structures::{BlockPostingList, PostingList, SSTableWriter, TERMINATED, TermInfo};
15
16/// Segment merger - merges multiple segments into one
17pub struct SegmentMerger {
18    schema: Arc<Schema>,
19}
20
21impl SegmentMerger {
22    pub fn new(schema: Arc<Schema>) -> Self {
23        Self { schema }
24    }
25
26    /// Merge segments - uses optimized store stacking when possible
27    pub async fn merge<D: Directory + DirectoryWriter>(
28        &self,
29        dir: &D,
30        segments: &[SegmentReader],
31        new_segment_id: SegmentId,
32    ) -> Result<SegmentMeta> {
33        // Check if we can use optimized store stacking (no dictionaries)
34        let can_stack_stores = segments.iter().all(|s| !s.store_has_dict());
35
36        if can_stack_stores {
37            self.merge_optimized(dir, segments, new_segment_id).await
38        } else {
39            self.merge_rebuild(dir, segments, new_segment_id).await
40        }
41    }
42
43    /// Optimized merge: stack compressed store blocks, rebuild only term dict/postings
44    async fn merge_optimized<D: Directory + DirectoryWriter>(
45        &self,
46        dir: &D,
47        segments: &[SegmentReader],
48        new_segment_id: SegmentId,
49    ) -> Result<SegmentMeta> {
50        let files = SegmentFiles::new(new_segment_id.0);
51
52        // Build merged term dictionary and postings (still need to rebuild these)
53        let mut term_dict_data = Vec::new();
54        let mut postings_data = Vec::new();
55        self.merge_postings(segments, &mut term_dict_data, &mut postings_data)
56            .await?;
57
58        // Stack store files without recompression
59        let mut store_data = Vec::new();
60        {
61            let mut store_merger = StoreMerger::new(&mut store_data);
62            for segment in segments {
63                let raw_blocks = segment.store_raw_blocks();
64                let data_slice = segment.store_data_slice();
65                store_merger.append_store(data_slice, &raw_blocks).await?;
66            }
67            store_merger.finish()?;
68        }
69
70        // Write files
71        dir.write(&files.term_dict, &term_dict_data).await?;
72        dir.write(&files.postings, &postings_data).await?;
73        dir.write(&files.store, &store_data).await?;
74
75        // Merge field stats
76        let mut merged_field_stats: FxHashMap<u32, FieldStats> = FxHashMap::default();
77        for segment in segments {
78            for (&field_id, stats) in &segment.meta().field_stats {
79                let entry = merged_field_stats.entry(field_id).or_default();
80                entry.total_tokens += stats.total_tokens;
81                entry.doc_count += stats.doc_count;
82            }
83        }
84
85        let total_docs: u32 = segments.iter().map(|s| s.num_docs()).sum();
86        let meta = SegmentMeta {
87            id: new_segment_id.0,
88            num_docs: total_docs,
89            field_stats: merged_field_stats,
90        };
91
92        dir.write(&files.meta, &meta.serialize()?).await?;
93
94        Ok(meta)
95    }
96
97    /// Fallback merge: rebuild everything from documents
98    async fn merge_rebuild<D: Directory + DirectoryWriter>(
99        &self,
100        dir: &D,
101        segments: &[SegmentReader],
102        new_segment_id: SegmentId,
103    ) -> Result<SegmentMeta> {
104        let mut builder =
105            SegmentBuilder::new((*self.schema).clone(), SegmentBuilderConfig::default())?;
106
107        for segment in segments {
108            for doc_id in 0..segment.num_docs() {
109                if let Some(doc) = segment.doc(doc_id).await? {
110                    builder.add_document(doc)?;
111                }
112            }
113        }
114
115        builder.build(dir, new_segment_id).await
116    }
117
118    /// Merge postings from multiple segments
119    async fn merge_postings(
120        &self,
121        segments: &[SegmentReader],
122        term_dict: &mut Vec<u8>,
123        postings_out: &mut Vec<u8>,
124    ) -> Result<()> {
125        use std::collections::BTreeMap;
126
127        // Collect all terms across all segments with doc ID remapping
128        let mut all_terms: BTreeMap<Vec<u8>, PostingList> = BTreeMap::new();
129        let mut doc_offset = 0u32;
130
131        for segment in segments {
132            // Iterate through all terms in this segment
133            let terms = segment.all_terms().await?;
134            for (key, term_info) in terms {
135                // Read the posting list - handle both inline and external
136                let source_postings = if let Some((doc_ids, term_freqs)) = term_info.decode_inline()
137                {
138                    // Inline posting list - no I/O needed
139                    let mut pl = PostingList::with_capacity(doc_ids.len());
140                    for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
141                        pl.push(doc_id, tf);
142                    }
143                    BlockPostingList::from_posting_list(&pl)?
144                } else {
145                    // External posting list
146                    let (offset, len) = term_info.external_info().unwrap();
147                    let posting_bytes = segment.read_postings(offset, len).await?;
148                    BlockPostingList::deserialize(&mut posting_bytes.as_slice())?
149                };
150
151                // Merge into combined posting list with remapped doc IDs
152                let merged = all_terms.entry(key).or_default();
153                let mut iter = source_postings.iterator();
154                while iter.doc() != TERMINATED {
155                    merged.add(iter.doc() + doc_offset, iter.term_freq());
156                    iter.advance();
157                }
158            }
159            doc_offset += segment.num_docs();
160        }
161
162        // Write merged term dictionary and postings
163        let mut writer = SSTableWriter::<TermInfo>::new(term_dict);
164
165        for (key, posting_list) in &all_terms {
166            // Try to inline small posting lists
167            let doc_ids: Vec<u32> = posting_list.iter().map(|p| p.doc_id).collect();
168            let term_freqs: Vec<u32> = posting_list.iter().map(|p| p.term_freq).collect();
169
170            let term_info = if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
171                // Small posting list - inline it directly
172                inline
173            } else {
174                // Large posting list - write to external file
175                let posting_offset = postings_out.len() as u64;
176                let block_list = BlockPostingList::from_posting_list(posting_list)?;
177                let mut encoded = Vec::new();
178                block_list.serialize(&mut encoded)?;
179                postings_out.extend_from_slice(&encoded);
180                TermInfo::external(
181                    posting_offset,
182                    encoded.len() as u32,
183                    posting_list.doc_count(),
184                )
185            };
186
187            writer.insert(key, &term_info)?;
188        }
189
190        writer.finish()?;
191        Ok(())
192    }
193}
194
195/// Delete segment files from directory
196pub async fn delete_segment<D: Directory + DirectoryWriter>(
197    dir: &D,
198    segment_id: SegmentId,
199) -> Result<()> {
200    let files = SegmentFiles::new(segment_id.0);
201    let _ = dir.delete(&files.term_dict).await;
202    let _ = dir.delete(&files.postings).await;
203    let _ = dir.delete(&files.store).await;
204    let _ = dir.delete(&files.meta).await;
205    Ok(())
206}