hermes_core/segment/
merger.rs

1//! Segment merger for combining multiple segments
2
3use std::sync::Arc;
4
5use rustc_hash::FxHashMap;
6
7use super::builder::SegmentBuilder;
8use super::reader::SegmentReader;
9use super::store::StoreMerger;
10use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
11use crate::Result;
12use crate::directories::{Directory, DirectoryWriter};
13use crate::dsl::Schema;
14use crate::structures::{BlockPostingList, PostingList, SSTableWriter, TERMINATED, TermInfo};
15
16/// Segment merger - merges multiple segments into one
17pub struct SegmentMerger {
18    schema: Arc<Schema>,
19}
20
21impl SegmentMerger {
22    pub fn new(schema: Arc<Schema>) -> Self {
23        Self { schema }
24    }
25
26    /// Merge segments - uses optimized store stacking when possible
27    pub async fn merge<D: Directory + DirectoryWriter>(
28        &self,
29        dir: &D,
30        segments: &[SegmentReader],
31        new_segment_id: SegmentId,
32    ) -> Result<SegmentMeta> {
33        // Check if we can use optimized store stacking (no dictionaries)
34        let can_stack_stores = segments.iter().all(|s| !s.store_has_dict());
35
36        if can_stack_stores {
37            self.merge_optimized(dir, segments, new_segment_id).await
38        } else {
39            self.merge_rebuild(dir, segments, new_segment_id).await
40        }
41    }
42
43    /// Optimized merge: stack compressed store blocks, rebuild only term dict/postings
44    async fn merge_optimized<D: Directory + DirectoryWriter>(
45        &self,
46        dir: &D,
47        segments: &[SegmentReader],
48        new_segment_id: SegmentId,
49    ) -> Result<SegmentMeta> {
50        let files = SegmentFiles::new(new_segment_id.0);
51
52        // Build merged term dictionary and postings (still need to rebuild these)
53        let mut term_dict_data = Vec::new();
54        let mut postings_data = Vec::new();
55        self.merge_postings(segments, &mut term_dict_data, &mut postings_data)
56            .await?;
57
58        // Stack store files without recompression
59        let mut store_data = Vec::new();
60        {
61            let mut store_merger = StoreMerger::new(&mut store_data);
62            for segment in segments {
63                let raw_blocks = segment.store_raw_blocks();
64                let data_slice = segment.store_data_slice();
65                store_merger.append_store(data_slice, &raw_blocks).await?;
66            }
67            store_merger.finish()?;
68        }
69
70        // Write files
71        dir.write(&files.term_dict, &term_dict_data).await?;
72        dir.write(&files.postings, &postings_data).await?;
73        dir.write(&files.store, &store_data).await?;
74
75        // Merge field stats
76        let mut merged_field_stats: FxHashMap<u32, FieldStats> = FxHashMap::default();
77        for segment in segments {
78            for (&field_id, stats) in &segment.meta().field_stats {
79                let entry = merged_field_stats.entry(field_id).or_default();
80                entry.total_tokens += stats.total_tokens;
81                entry.doc_count += stats.doc_count;
82            }
83        }
84
85        let total_docs: u32 = segments.iter().map(|s| s.num_docs()).sum();
86        let meta = SegmentMeta {
87            id: new_segment_id.0,
88            num_docs: total_docs,
89            field_stats: merged_field_stats,
90        };
91
92        dir.write(&files.meta, &meta.serialize()?).await?;
93
94        Ok(meta)
95    }
96
97    /// Fallback merge: rebuild everything from documents
98    async fn merge_rebuild<D: Directory + DirectoryWriter>(
99        &self,
100        dir: &D,
101        segments: &[SegmentReader],
102        new_segment_id: SegmentId,
103    ) -> Result<SegmentMeta> {
104        let mut builder = SegmentBuilder::new((*self.schema).clone());
105
106        for segment in segments {
107            for doc_id in 0..segment.num_docs() {
108                if let Some(doc) = segment.doc(doc_id).await? {
109                    builder.add_document(doc)?;
110                }
111            }
112        }
113
114        builder.build(dir, new_segment_id).await
115    }
116
117    /// Merge postings from multiple segments
118    async fn merge_postings(
119        &self,
120        segments: &[SegmentReader],
121        term_dict: &mut Vec<u8>,
122        postings_out: &mut Vec<u8>,
123    ) -> Result<()> {
124        use std::collections::BTreeMap;
125
126        // Collect all terms across all segments with doc ID remapping
127        let mut all_terms: BTreeMap<Vec<u8>, PostingList> = BTreeMap::new();
128        let mut doc_offset = 0u32;
129
130        for segment in segments {
131            // Iterate through all terms in this segment
132            let terms = segment.all_terms().await?;
133            for (key, term_info) in terms {
134                // Read the posting list - handle both inline and external
135                let source_postings = if let Some((doc_ids, term_freqs)) = term_info.decode_inline()
136                {
137                    // Inline posting list - no I/O needed
138                    let mut pl = PostingList::with_capacity(doc_ids.len());
139                    for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
140                        pl.push(doc_id, tf);
141                    }
142                    BlockPostingList::from_posting_list(&pl)?
143                } else {
144                    // External posting list
145                    let (offset, len) = term_info.external_info().unwrap();
146                    let posting_bytes = segment.read_postings(offset, len).await?;
147                    BlockPostingList::deserialize(&mut posting_bytes.as_slice())?
148                };
149
150                // Merge into combined posting list with remapped doc IDs
151                let merged = all_terms.entry(key).or_default();
152                let mut iter = source_postings.iterator();
153                while iter.doc() != TERMINATED {
154                    merged.add(iter.doc() + doc_offset, iter.term_freq());
155                    iter.advance();
156                }
157            }
158            doc_offset += segment.num_docs();
159        }
160
161        // Write merged term dictionary and postings
162        let mut writer = SSTableWriter::<TermInfo>::new(term_dict);
163
164        for (key, posting_list) in &all_terms {
165            // Try to inline small posting lists
166            let doc_ids: Vec<u32> = posting_list.iter().map(|p| p.doc_id).collect();
167            let term_freqs: Vec<u32> = posting_list.iter().map(|p| p.term_freq).collect();
168
169            let term_info = if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
170                // Small posting list - inline it directly
171                inline
172            } else {
173                // Large posting list - write to external file
174                let posting_offset = postings_out.len() as u64;
175                let block_list = BlockPostingList::from_posting_list(posting_list)?;
176                let mut encoded = Vec::new();
177                block_list.serialize(&mut encoded)?;
178                postings_out.extend_from_slice(&encoded);
179                TermInfo::external(
180                    posting_offset,
181                    encoded.len() as u32,
182                    posting_list.doc_count(),
183                )
184            };
185
186            writer.insert(key, &term_info)?;
187        }
188
189        writer.finish()?;
190        Ok(())
191    }
192}
193
194/// Delete segment files from directory
195pub async fn delete_segment<D: Directory + DirectoryWriter>(
196    dir: &D,
197    segment_id: SegmentId,
198) -> Result<()> {
199    let files = SegmentFiles::new(segment_id.0);
200    let _ = dir.delete(&files.term_dict).await;
201    let _ = dir.delete(&files.postings).await;
202    let _ = dir.delete(&files.store).await;
203    let _ = dir.delete(&files.meta).await;
204    Ok(())
205}