hermes_core/segment/
merger.rs1use std::sync::Arc;
4
5use rustc_hash::FxHashMap;
6
7use super::builder::{SegmentBuilder, SegmentBuilderConfig};
8use super::reader::SegmentReader;
9use super::store::StoreMerger;
10use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
11use crate::Result;
12use crate::directories::{Directory, DirectoryWriter};
13use crate::dsl::Schema;
14use crate::structures::{BlockPostingList, PostingList, SSTableWriter, TERMINATED, TermInfo};
15
16pub struct SegmentMerger {
18 schema: Arc<Schema>,
19}
20
21impl SegmentMerger {
22 pub fn new(schema: Arc<Schema>) -> Self {
23 Self { schema }
24 }
25
26 pub async fn merge<D: Directory + DirectoryWriter>(
28 &self,
29 dir: &D,
30 segments: &[SegmentReader],
31 new_segment_id: SegmentId,
32 ) -> Result<SegmentMeta> {
33 let can_stack_stores = segments.iter().all(|s| !s.store_has_dict());
35
36 if can_stack_stores {
37 self.merge_optimized(dir, segments, new_segment_id).await
38 } else {
39 self.merge_rebuild(dir, segments, new_segment_id).await
40 }
41 }
42
43 async fn merge_optimized<D: Directory + DirectoryWriter>(
45 &self,
46 dir: &D,
47 segments: &[SegmentReader],
48 new_segment_id: SegmentId,
49 ) -> Result<SegmentMeta> {
50 let files = SegmentFiles::new(new_segment_id.0);
51
52 let mut term_dict_data = Vec::new();
54 let mut postings_data = Vec::new();
55 self.merge_postings(segments, &mut term_dict_data, &mut postings_data)
56 .await?;
57
58 let mut store_data = Vec::new();
60 {
61 let mut store_merger = StoreMerger::new(&mut store_data);
62 for segment in segments {
63 let raw_blocks = segment.store_raw_blocks();
64 let data_slice = segment.store_data_slice();
65 store_merger.append_store(data_slice, &raw_blocks).await?;
66 }
67 store_merger.finish()?;
68 }
69
70 dir.write(&files.term_dict, &term_dict_data).await?;
72 dir.write(&files.postings, &postings_data).await?;
73 dir.write(&files.store, &store_data).await?;
74
75 let mut merged_field_stats: FxHashMap<u32, FieldStats> = FxHashMap::default();
77 for segment in segments {
78 for (&field_id, stats) in &segment.meta().field_stats {
79 let entry = merged_field_stats.entry(field_id).or_default();
80 entry.total_tokens += stats.total_tokens;
81 entry.doc_count += stats.doc_count;
82 }
83 }
84
85 let total_docs: u32 = segments.iter().map(|s| s.num_docs()).sum();
86 let meta = SegmentMeta {
87 id: new_segment_id.0,
88 num_docs: total_docs,
89 field_stats: merged_field_stats,
90 };
91
92 dir.write(&files.meta, &meta.serialize()?).await?;
93
94 Ok(meta)
95 }
96
97 async fn merge_rebuild<D: Directory + DirectoryWriter>(
99 &self,
100 dir: &D,
101 segments: &[SegmentReader],
102 new_segment_id: SegmentId,
103 ) -> Result<SegmentMeta> {
104 let mut builder =
105 SegmentBuilder::new((*self.schema).clone(), SegmentBuilderConfig::default())?;
106
107 for segment in segments {
108 for doc_id in 0..segment.num_docs() {
109 if let Some(doc) = segment.doc(doc_id).await? {
110 builder.add_document(doc)?;
111 }
112 }
113 }
114
115 builder.build(dir, new_segment_id).await
116 }
117
118 async fn merge_postings(
120 &self,
121 segments: &[SegmentReader],
122 term_dict: &mut Vec<u8>,
123 postings_out: &mut Vec<u8>,
124 ) -> Result<()> {
125 use std::collections::BTreeMap;
126
127 let mut all_terms: BTreeMap<Vec<u8>, PostingList> = BTreeMap::new();
129 let mut doc_offset = 0u32;
130
131 for segment in segments {
132 let terms = segment.all_terms().await?;
134 for (key, term_info) in terms {
135 let source_postings = if let Some((doc_ids, term_freqs)) = term_info.decode_inline()
137 {
138 let mut pl = PostingList::with_capacity(doc_ids.len());
140 for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
141 pl.push(doc_id, tf);
142 }
143 BlockPostingList::from_posting_list(&pl)?
144 } else {
145 let (offset, len) = term_info.external_info().unwrap();
147 let posting_bytes = segment.read_postings(offset, len).await?;
148 BlockPostingList::deserialize(&mut posting_bytes.as_slice())?
149 };
150
151 let merged = all_terms.entry(key).or_default();
153 let mut iter = source_postings.iterator();
154 while iter.doc() != TERMINATED {
155 merged.add(iter.doc() + doc_offset, iter.term_freq());
156 iter.advance();
157 }
158 }
159 doc_offset += segment.num_docs();
160 }
161
162 let mut writer = SSTableWriter::<TermInfo>::new(term_dict);
164
165 for (key, posting_list) in &all_terms {
166 let doc_ids: Vec<u32> = posting_list.iter().map(|p| p.doc_id).collect();
168 let term_freqs: Vec<u32> = posting_list.iter().map(|p| p.term_freq).collect();
169
170 let term_info = if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
171 inline
173 } else {
174 let posting_offset = postings_out.len() as u64;
176 let block_list = BlockPostingList::from_posting_list(posting_list)?;
177 let mut encoded = Vec::new();
178 block_list.serialize(&mut encoded)?;
179 postings_out.extend_from_slice(&encoded);
180 TermInfo::external(
181 posting_offset,
182 encoded.len() as u32,
183 posting_list.doc_count(),
184 )
185 };
186
187 writer.insert(key, &term_info)?;
188 }
189
190 writer.finish()?;
191 Ok(())
192 }
193}
194
195pub async fn delete_segment<D: Directory + DirectoryWriter>(
197 dir: &D,
198 segment_id: SegmentId,
199) -> Result<()> {
200 let files = SegmentFiles::new(segment_id.0);
201 let _ = dir.delete(&files.term_dict).await;
202 let _ = dir.delete(&files.postings).await;
203 let _ = dir.delete(&files.store).await;
204 let _ = dir.delete(&files.meta).await;
205 Ok(())
206}