1use std::sync::Arc;
4
5use rustc_hash::FxHashMap;
6
7use super::builder::{SegmentBuilder, SegmentBuilderConfig};
8use super::reader::SegmentReader;
9use super::store::StoreMerger;
10use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
11use crate::Result;
12use crate::directories::{Directory, DirectoryWriter};
13use crate::dsl::Schema;
14use crate::structures::{BlockPostingList, PostingList, SSTableWriter, TERMINATED, TermInfo};
15
16pub struct SegmentMerger {
18 schema: Arc<Schema>,
19}
20
21impl SegmentMerger {
22 pub fn new(schema: Arc<Schema>) -> Self {
23 Self { schema }
24 }
25
26 pub async fn merge<D: Directory + DirectoryWriter>(
28 &self,
29 dir: &D,
30 segments: &[SegmentReader],
31 new_segment_id: SegmentId,
32 ) -> Result<SegmentMeta> {
33 let can_stack_stores = segments.iter().all(|s| !s.store_has_dict());
35
36 if can_stack_stores {
37 self.merge_optimized(dir, segments, new_segment_id).await
38 } else {
39 self.merge_rebuild(dir, segments, new_segment_id).await
40 }
41 }
42
43 async fn merge_optimized<D: Directory + DirectoryWriter>(
45 &self,
46 dir: &D,
47 segments: &[SegmentReader],
48 new_segment_id: SegmentId,
49 ) -> Result<SegmentMeta> {
50 let files = SegmentFiles::new(new_segment_id.0);
51
52 let mut term_dict_data = Vec::new();
54 let mut postings_data = Vec::new();
55 self.merge_postings(segments, &mut term_dict_data, &mut postings_data)
56 .await?;
57
58 let mut store_data = Vec::new();
60 {
61 let mut store_merger = StoreMerger::new(&mut store_data);
62 for segment in segments {
63 let raw_blocks = segment.store_raw_blocks();
64 let data_slice = segment.store_data_slice();
65 store_merger.append_store(data_slice, &raw_blocks).await?;
66 }
67 store_merger.finish()?;
68 }
69
70 dir.write(&files.term_dict, &term_dict_data).await?;
72 dir.write(&files.postings, &postings_data).await?;
73 dir.write(&files.store, &store_data).await?;
74
75 let mut merged_field_stats: FxHashMap<u32, FieldStats> = FxHashMap::default();
77 for segment in segments {
78 for (&field_id, stats) in &segment.meta().field_stats {
79 let entry = merged_field_stats.entry(field_id).or_default();
80 entry.total_tokens += stats.total_tokens;
81 entry.doc_count += stats.doc_count;
82 }
83 }
84
85 let total_docs: u32 = segments.iter().map(|s| s.num_docs()).sum();
86 let meta = SegmentMeta {
87 id: new_segment_id.0,
88 num_docs: total_docs,
89 field_stats: merged_field_stats,
90 };
91
92 dir.write(&files.meta, &meta.serialize()?).await?;
93
94 Ok(meta)
95 }
96
97 async fn merge_rebuild<D: Directory + DirectoryWriter>(
99 &self,
100 dir: &D,
101 segments: &[SegmentReader],
102 new_segment_id: SegmentId,
103 ) -> Result<SegmentMeta> {
104 let mut builder =
105 SegmentBuilder::new((*self.schema).clone(), SegmentBuilderConfig::default())?;
106
107 for segment in segments {
108 for doc_id in 0..segment.num_docs() {
109 if let Some(doc) = segment.doc(doc_id).await? {
110 builder.add_document(doc)?;
111 }
112 }
113 }
114
115 builder.build(dir, new_segment_id).await
116 }
117
118 async fn merge_postings(
124 &self,
125 segments: &[SegmentReader],
126 term_dict: &mut Vec<u8>,
127 postings_out: &mut Vec<u8>,
128 ) -> Result<()> {
129 use std::collections::BTreeMap;
130
131 let mut term_sources: BTreeMap<Vec<u8>, Vec<(usize, TermInfo, u32)>> = BTreeMap::new();
134 let mut doc_offset = 0u32;
135
136 for (seg_idx, segment) in segments.iter().enumerate() {
137 let terms = segment.all_terms().await?;
138 for (key, term_info) in terms {
139 term_sources
140 .entry(key)
141 .or_default()
142 .push((seg_idx, term_info, doc_offset));
143 }
144 doc_offset += segment.num_docs();
145 }
146
147 let mut term_results: Vec<(Vec<u8>, TermInfo)> = Vec::with_capacity(term_sources.len());
150
151 for (key, sources) in term_sources {
152 let term_info = if sources.len() == 1 {
153 let (seg_idx, source_info, seg_doc_offset) = &sources[0];
155 self.copy_term_posting(
156 &segments[*seg_idx],
157 source_info,
158 *seg_doc_offset,
159 postings_out,
160 )
161 .await?
162 } else {
163 self.merge_term_postings(segments, &sources, postings_out)
165 .await?
166 };
167
168 term_results.push((key, term_info));
169 }
170
171 let mut writer = SSTableWriter::<TermInfo>::new(term_dict);
173 for (key, term_info) in term_results {
174 writer.insert(&key, &term_info)?;
175 }
176 writer.finish()?;
177
178 Ok(())
179 }
180
181 async fn copy_term_posting(
184 &self,
185 segment: &SegmentReader,
186 source_info: &TermInfo,
187 doc_offset: u32,
188 postings_out: &mut Vec<u8>,
189 ) -> Result<TermInfo> {
190 if let Some((doc_ids, term_freqs)) = source_info.decode_inline() {
192 let remapped_ids: Vec<u32> = doc_ids.iter().map(|&id| id + doc_offset).collect();
193 if let Some(inline) = TermInfo::try_inline(&remapped_ids, &term_freqs) {
194 return Ok(inline);
195 }
196 let mut pl = PostingList::with_capacity(remapped_ids.len());
198 for (doc_id, tf) in remapped_ids.into_iter().zip(term_freqs.into_iter()) {
199 pl.push(doc_id, tf);
200 }
201 let posting_offset = postings_out.len() as u64;
202 let block_list = BlockPostingList::from_posting_list(&pl)?;
203 let mut encoded = Vec::new();
204 block_list.serialize(&mut encoded)?;
205 postings_out.extend_from_slice(&encoded);
206 return Ok(TermInfo::external(
207 posting_offset,
208 encoded.len() as u32,
209 pl.doc_count(),
210 ));
211 }
212
213 let (offset, len) = source_info.external_info().unwrap();
216 let posting_bytes = segment.read_postings(offset, len).await?;
217 let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
218
219 let mut remapped = PostingList::with_capacity(source_postings.doc_count() as usize);
221 let mut iter = source_postings.iterator();
222 while iter.doc() != TERMINATED {
223 remapped.add(iter.doc() + doc_offset, iter.term_freq());
224 iter.advance();
225 }
226
227 let doc_ids: Vec<u32> = remapped.iter().map(|p| p.doc_id).collect();
229 let term_freqs: Vec<u32> = remapped.iter().map(|p| p.term_freq).collect();
230
231 if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
232 return Ok(inline);
233 }
234
235 let posting_offset = postings_out.len() as u64;
237 let block_list = BlockPostingList::from_posting_list(&remapped)?;
238 let mut encoded = Vec::new();
239 block_list.serialize(&mut encoded)?;
240 postings_out.extend_from_slice(&encoded);
241
242 Ok(TermInfo::external(
243 posting_offset,
244 encoded.len() as u32,
245 remapped.doc_count(),
246 ))
247 }
248
249 async fn merge_term_postings(
252 &self,
253 segments: &[SegmentReader],
254 sources: &[(usize, TermInfo, u32)],
255 postings_out: &mut Vec<u8>,
256 ) -> Result<TermInfo> {
257 let mut sorted_sources: Vec<_> = sources.to_vec();
259 sorted_sources.sort_by_key(|(_, _, doc_offset)| *doc_offset);
260
261 let all_external = sorted_sources
263 .iter()
264 .all(|(_, term_info, _)| term_info.external_info().is_some());
265
266 if all_external && sorted_sources.len() > 1 {
267 let mut block_sources = Vec::with_capacity(sorted_sources.len());
269
270 for (seg_idx, term_info, doc_offset) in &sorted_sources {
271 let segment = &segments[*seg_idx];
272 let (offset, len) = term_info.external_info().unwrap();
273 let posting_bytes = segment.read_postings(offset, len).await?;
274 let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
275 block_sources.push((source_postings, *doc_offset));
276 }
277
278 let merged_blocks = BlockPostingList::concatenate_blocks(&block_sources)?;
279 let posting_offset = postings_out.len() as u64;
280 let mut encoded = Vec::new();
281 merged_blocks.serialize(&mut encoded)?;
282 postings_out.extend_from_slice(&encoded);
283
284 return Ok(TermInfo::external(
285 posting_offset,
286 encoded.len() as u32,
287 merged_blocks.doc_count(),
288 ));
289 }
290
291 let mut merged = PostingList::new();
293
294 for (seg_idx, term_info, doc_offset) in &sorted_sources {
295 let segment = &segments[*seg_idx];
296
297 if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
298 for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
300 merged.add(doc_id + doc_offset, tf);
301 }
302 } else {
303 let (offset, len) = term_info.external_info().unwrap();
305 let posting_bytes = segment.read_postings(offset, len).await?;
306 let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
307
308 let mut iter = source_postings.iterator();
309 while iter.doc() != TERMINATED {
310 merged.add(iter.doc() + doc_offset, iter.term_freq());
311 iter.advance();
312 }
313 }
314 }
315
316 let doc_ids: Vec<u32> = merged.iter().map(|p| p.doc_id).collect();
318 let term_freqs: Vec<u32> = merged.iter().map(|p| p.term_freq).collect();
319
320 if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
321 return Ok(inline);
322 }
323
324 let posting_offset = postings_out.len() as u64;
326 let block_list = BlockPostingList::from_posting_list(&merged)?;
327 let mut encoded = Vec::new();
328 block_list.serialize(&mut encoded)?;
329 postings_out.extend_from_slice(&encoded);
330
331 Ok(TermInfo::external(
332 posting_offset,
333 encoded.len() as u32,
334 merged.doc_count(),
335 ))
336 }
337}
338
339pub async fn delete_segment<D: Directory + DirectoryWriter>(
341 dir: &D,
342 segment_id: SegmentId,
343) -> Result<()> {
344 let files = SegmentFiles::new(segment_id.0);
345 let _ = dir.delete(&files.term_dict).await;
346 let _ = dir.delete(&files.postings).await;
347 let _ = dir.delete(&files.store).await;
348 let _ = dir.delete(&files.meta).await;
349 Ok(())
350}