1use std::sync::Arc;
4
5use rustc_hash::FxHashMap;
6
7use super::builder::{SegmentBuilder, SegmentBuilderConfig};
8use super::reader::SegmentReader;
9use super::store::StoreMerger;
10use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
11use crate::Result;
12use crate::directories::{Directory, DirectoryWriter};
13use crate::dsl::{FieldType, Schema};
14use crate::structures::{
15 BlockPostingList, PostingList, RaBitQConfig, RaBitQIndex, SSTableWriter, TERMINATED, TermInfo,
16};
17
18pub struct SegmentMerger {
20 schema: Arc<Schema>,
21}
22
23impl SegmentMerger {
24 pub fn new(schema: Arc<Schema>) -> Self {
25 Self { schema }
26 }
27
28 pub async fn merge<D: Directory + DirectoryWriter>(
30 &self,
31 dir: &D,
32 segments: &[SegmentReader],
33 new_segment_id: SegmentId,
34 ) -> Result<SegmentMeta> {
35 let can_stack_stores = segments.iter().all(|s| !s.store_has_dict());
37
38 if can_stack_stores {
39 self.merge_optimized(dir, segments, new_segment_id).await
40 } else {
41 self.merge_rebuild(dir, segments, new_segment_id).await
42 }
43 }
44
45 async fn merge_optimized<D: Directory + DirectoryWriter>(
47 &self,
48 dir: &D,
49 segments: &[SegmentReader],
50 new_segment_id: SegmentId,
51 ) -> Result<SegmentMeta> {
52 let files = SegmentFiles::new(new_segment_id.0);
53
54 let mut term_dict_data = Vec::new();
56 let mut postings_data = Vec::new();
57 self.merge_postings(segments, &mut term_dict_data, &mut postings_data)
58 .await?;
59
60 let mut store_data = Vec::new();
62 {
63 let mut store_merger = StoreMerger::new(&mut store_data);
64 for segment in segments {
65 let raw_blocks = segment.store_raw_blocks();
66 let data_slice = segment.store_data_slice();
67 store_merger.append_store(data_slice, &raw_blocks).await?;
68 }
69 store_merger.finish()?;
70 }
71
72 dir.write(&files.term_dict, &term_dict_data).await?;
74 dir.write(&files.postings, &postings_data).await?;
75 dir.write(&files.store, &store_data).await?;
76
77 self.merge_dense_vectors(dir, segments, &files).await?;
79
80 let mut merged_field_stats: FxHashMap<u32, FieldStats> = FxHashMap::default();
82 for segment in segments {
83 for (&field_id, stats) in &segment.meta().field_stats {
84 let entry = merged_field_stats.entry(field_id).or_default();
85 entry.total_tokens += stats.total_tokens;
86 entry.doc_count += stats.doc_count;
87 }
88 }
89
90 let total_docs: u32 = segments.iter().map(|s| s.num_docs()).sum();
91 let meta = SegmentMeta {
92 id: new_segment_id.0,
93 num_docs: total_docs,
94 field_stats: merged_field_stats,
95 };
96
97 dir.write(&files.meta, &meta.serialize()?).await?;
98
99 Ok(meta)
100 }
101
102 async fn merge_rebuild<D: Directory + DirectoryWriter>(
104 &self,
105 dir: &D,
106 segments: &[SegmentReader],
107 new_segment_id: SegmentId,
108 ) -> Result<SegmentMeta> {
109 let mut builder =
110 SegmentBuilder::new((*self.schema).clone(), SegmentBuilderConfig::default())?;
111
112 for segment in segments {
113 for doc_id in 0..segment.num_docs() {
114 if let Some(doc) = segment.doc(doc_id).await? {
115 builder.add_document(doc)?;
116 }
117 }
118 }
119
120 builder.build(dir, new_segment_id).await
121 }
122
123 async fn merge_postings(
129 &self,
130 segments: &[SegmentReader],
131 term_dict: &mut Vec<u8>,
132 postings_out: &mut Vec<u8>,
133 ) -> Result<()> {
134 use std::collections::BTreeMap;
135
136 let mut term_sources: BTreeMap<Vec<u8>, Vec<(usize, TermInfo, u32)>> = BTreeMap::new();
139 let mut doc_offset = 0u32;
140
141 for (seg_idx, segment) in segments.iter().enumerate() {
142 let terms = segment.all_terms().await?;
143 for (key, term_info) in terms {
144 term_sources
145 .entry(key)
146 .or_default()
147 .push((seg_idx, term_info, doc_offset));
148 }
149 doc_offset += segment.num_docs();
150 }
151
152 let mut term_results: Vec<(Vec<u8>, TermInfo)> = Vec::with_capacity(term_sources.len());
155
156 for (key, sources) in term_sources {
157 let term_info = if sources.len() == 1 {
158 let (seg_idx, source_info, seg_doc_offset) = &sources[0];
160 self.copy_term_posting(
161 &segments[*seg_idx],
162 source_info,
163 *seg_doc_offset,
164 postings_out,
165 )
166 .await?
167 } else {
168 self.merge_term_postings(segments, &sources, postings_out)
170 .await?
171 };
172
173 term_results.push((key, term_info));
174 }
175
176 let mut writer = SSTableWriter::<TermInfo>::new(term_dict);
178 for (key, term_info) in term_results {
179 writer.insert(&key, &term_info)?;
180 }
181 writer.finish()?;
182
183 Ok(())
184 }
185
186 async fn copy_term_posting(
189 &self,
190 segment: &SegmentReader,
191 source_info: &TermInfo,
192 doc_offset: u32,
193 postings_out: &mut Vec<u8>,
194 ) -> Result<TermInfo> {
195 if let Some((doc_ids, term_freqs)) = source_info.decode_inline() {
197 let remapped_ids: Vec<u32> = doc_ids.iter().map(|&id| id + doc_offset).collect();
198 if let Some(inline) = TermInfo::try_inline(&remapped_ids, &term_freqs) {
199 return Ok(inline);
200 }
201 let mut pl = PostingList::with_capacity(remapped_ids.len());
203 for (doc_id, tf) in remapped_ids.into_iter().zip(term_freqs.into_iter()) {
204 pl.push(doc_id, tf);
205 }
206 let posting_offset = postings_out.len() as u64;
207 let block_list = BlockPostingList::from_posting_list(&pl)?;
208 let mut encoded = Vec::new();
209 block_list.serialize(&mut encoded)?;
210 postings_out.extend_from_slice(&encoded);
211 return Ok(TermInfo::external(
212 posting_offset,
213 encoded.len() as u32,
214 pl.doc_count(),
215 ));
216 }
217
218 let (offset, len) = source_info.external_info().unwrap();
221 let posting_bytes = segment.read_postings(offset, len).await?;
222 let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
223
224 let mut remapped = PostingList::with_capacity(source_postings.doc_count() as usize);
226 let mut iter = source_postings.iterator();
227 while iter.doc() != TERMINATED {
228 remapped.add(iter.doc() + doc_offset, iter.term_freq());
229 iter.advance();
230 }
231
232 let doc_ids: Vec<u32> = remapped.iter().map(|p| p.doc_id).collect();
234 let term_freqs: Vec<u32> = remapped.iter().map(|p| p.term_freq).collect();
235
236 if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
237 return Ok(inline);
238 }
239
240 let posting_offset = postings_out.len() as u64;
242 let block_list = BlockPostingList::from_posting_list(&remapped)?;
243 let mut encoded = Vec::new();
244 block_list.serialize(&mut encoded)?;
245 postings_out.extend_from_slice(&encoded);
246
247 Ok(TermInfo::external(
248 posting_offset,
249 encoded.len() as u32,
250 remapped.doc_count(),
251 ))
252 }
253
254 async fn merge_term_postings(
257 &self,
258 segments: &[SegmentReader],
259 sources: &[(usize, TermInfo, u32)],
260 postings_out: &mut Vec<u8>,
261 ) -> Result<TermInfo> {
262 let mut sorted_sources: Vec<_> = sources.to_vec();
264 sorted_sources.sort_by_key(|(_, _, doc_offset)| *doc_offset);
265
266 let all_external = sorted_sources
268 .iter()
269 .all(|(_, term_info, _)| term_info.external_info().is_some());
270
271 if all_external && sorted_sources.len() > 1 {
272 let mut block_sources = Vec::with_capacity(sorted_sources.len());
274
275 for (seg_idx, term_info, doc_offset) in &sorted_sources {
276 let segment = &segments[*seg_idx];
277 let (offset, len) = term_info.external_info().unwrap();
278 let posting_bytes = segment.read_postings(offset, len).await?;
279 let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
280 block_sources.push((source_postings, *doc_offset));
281 }
282
283 let merged_blocks = BlockPostingList::concatenate_blocks(&block_sources)?;
284 let posting_offset = postings_out.len() as u64;
285 let mut encoded = Vec::new();
286 merged_blocks.serialize(&mut encoded)?;
287 postings_out.extend_from_slice(&encoded);
288
289 return Ok(TermInfo::external(
290 posting_offset,
291 encoded.len() as u32,
292 merged_blocks.doc_count(),
293 ));
294 }
295
296 let mut merged = PostingList::new();
298
299 for (seg_idx, term_info, doc_offset) in &sorted_sources {
300 let segment = &segments[*seg_idx];
301
302 if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
303 for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
305 merged.add(doc_id + doc_offset, tf);
306 }
307 } else {
308 let (offset, len) = term_info.external_info().unwrap();
310 let posting_bytes = segment.read_postings(offset, len).await?;
311 let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
312
313 let mut iter = source_postings.iterator();
314 while iter.doc() != TERMINATED {
315 merged.add(iter.doc() + doc_offset, iter.term_freq());
316 iter.advance();
317 }
318 }
319 }
320
321 let doc_ids: Vec<u32> = merged.iter().map(|p| p.doc_id).collect();
323 let term_freqs: Vec<u32> = merged.iter().map(|p| p.term_freq).collect();
324
325 if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
326 return Ok(inline);
327 }
328
329 let posting_offset = postings_out.len() as u64;
331 let block_list = BlockPostingList::from_posting_list(&merged)?;
332 let mut encoded = Vec::new();
333 block_list.serialize(&mut encoded)?;
334 postings_out.extend_from_slice(&encoded);
335
336 Ok(TermInfo::external(
337 posting_offset,
338 encoded.len() as u32,
339 merged.doc_count(),
340 ))
341 }
342 async fn merge_dense_vectors<D: Directory + DirectoryWriter>(
347 &self,
348 dir: &D,
349 segments: &[SegmentReader],
350 files: &SegmentFiles,
351 ) -> Result<()> {
352 use byteorder::{LittleEndian, WriteBytesExt};
353
354 let mut field_indexes: Vec<(u32, Vec<u8>)> = Vec::new();
355
356 for (field, entry) in self.schema.fields() {
357 if !matches!(entry.field_type, FieldType::DenseVector) {
358 continue;
359 }
360
361 let ivf_indexes: Vec<_> = segments
363 .iter()
364 .filter_map(|s| s.get_ivf_vector_index(field))
365 .collect();
366
367 if ivf_indexes.len()
368 == segments
369 .iter()
370 .filter(|s| s.has_dense_vector_index(field))
371 .count()
372 && !ivf_indexes.is_empty()
373 {
374 let refs: Vec<&crate::structures::IVFRaBitQIndex> =
376 ivf_indexes.iter().map(|arc| arc.as_ref()).collect();
377
378 let mut doc_offsets = Vec::with_capacity(segments.len());
380 let mut offset = 0u32;
381 for segment in segments {
382 doc_offsets.push(offset);
383 offset += segment.num_docs();
384 }
385
386 match crate::structures::IVFRaBitQIndex::merge(&refs, &doc_offsets) {
387 Ok(merged) => {
388 let bytes = serde_json::to_vec(&merged)
389 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
390 field_indexes.push((field.0, bytes));
391 continue;
392 }
393 Err(e) => {
394 log::warn!("IVF merge failed: {}, falling back to rebuild", e);
395 }
396 }
397 }
398
399 let mut all_vectors: Vec<Vec<f32>> = Vec::new();
401
402 for segment in segments {
403 if let Some(index) = segment.get_dense_vector_index(field)
404 && let Some(raw_vecs) = &index.raw_vectors
405 {
406 all_vectors.extend(raw_vecs.iter().cloned());
407 }
408 }
409
410 if !all_vectors.is_empty() {
411 let dim = all_vectors[0].len();
412 let config = RaBitQConfig::new(dim);
413 let merged_index = RaBitQIndex::build(config, &all_vectors, true);
414
415 let index_bytes = serde_json::to_vec(&merged_index)
416 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
417
418 field_indexes.push((field.0, index_bytes));
419 }
420 }
421
422 if !field_indexes.is_empty() {
424 field_indexes.sort_by_key(|(id, _)| *id);
425
426 let header_size = 4 + field_indexes.len() * (4 + 8 + 8);
427 let mut output = Vec::new();
428
429 output.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
430
431 let mut current_offset = header_size as u64;
432 for (field_id, data) in &field_indexes {
433 output.write_u32::<LittleEndian>(*field_id)?;
434 output.write_u64::<LittleEndian>(current_offset)?;
435 output.write_u64::<LittleEndian>(data.len() as u64)?;
436 current_offset += data.len() as u64;
437 }
438
439 for (_, data) in field_indexes {
440 output.extend_from_slice(&data);
441 }
442
443 dir.write(&files.vectors, &output).await?;
444 }
445
446 Ok(())
447 }
448}
449
450pub async fn delete_segment<D: Directory + DirectoryWriter>(
452 dir: &D,
453 segment_id: SegmentId,
454) -> Result<()> {
455 let files = SegmentFiles::new(segment_id.0);
456 let _ = dir.delete(&files.term_dict).await;
457 let _ = dir.delete(&files.postings).await;
458 let _ = dir.delete(&files.store).await;
459 let _ = dir.delete(&files.meta).await;
460 let _ = dir.delete(&files.vectors).await;
461 Ok(())
462}