1use std::cmp::Ordering;
4use std::collections::BinaryHeap;
5use std::sync::Arc;
6
7use rustc_hash::FxHashMap;
8
9use super::builder::{SegmentBuilder, SegmentBuilderConfig};
10use super::reader::SegmentReader;
11use super::store::StoreMerger;
12use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
13use crate::Result;
14use crate::directories::{Directory, DirectoryWriter};
15use crate::dsl::{FieldType, Schema};
16use crate::structures::{
17 BlockPostingList, PostingList, RaBitQConfig, RaBitQIndex, SSTableWriter, TERMINATED, TermInfo,
18};
19
20#[derive(Debug, Clone, Default)]
22pub struct MergeStats {
23 pub terms_processed: usize,
25 pub postings_merged: usize,
27 pub peak_memory_bytes: usize,
29 pub current_memory_bytes: usize,
31 pub term_dict_bytes: usize,
33 pub postings_bytes: usize,
35 pub store_bytes: usize,
37 pub vectors_bytes: usize,
39}
40
41impl MergeStats {
42 pub fn format_memory(bytes: usize) -> String {
44 if bytes >= 1024 * 1024 * 1024 {
45 format!("{:.2} GB", bytes as f64 / (1024.0 * 1024.0 * 1024.0))
46 } else if bytes >= 1024 * 1024 {
47 format!("{:.2} MB", bytes as f64 / (1024.0 * 1024.0))
48 } else if bytes >= 1024 {
49 format!("{:.2} KB", bytes as f64 / 1024.0)
50 } else {
51 format!("{} B", bytes)
52 }
53 }
54}
55
56struct MergeEntry {
58 key: Vec<u8>,
59 term_info: TermInfo,
60 segment_idx: usize,
61 doc_offset: u32,
62}
63
64impl PartialEq for MergeEntry {
65 fn eq(&self, other: &Self) -> bool {
66 self.key == other.key
67 }
68}
69
70impl Eq for MergeEntry {}
71
72impl PartialOrd for MergeEntry {
73 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
74 Some(self.cmp(other))
75 }
76}
77
78impl Ord for MergeEntry {
79 fn cmp(&self, other: &Self) -> Ordering {
80 other.key.cmp(&self.key)
82 }
83}
84
85pub struct SegmentMerger {
87 schema: Arc<Schema>,
88}
89
90impl SegmentMerger {
91 pub fn new(schema: Arc<Schema>) -> Self {
92 Self { schema }
93 }
94
95 pub async fn merge<D: Directory + DirectoryWriter>(
97 &self,
98 dir: &D,
99 segments: &[SegmentReader],
100 new_segment_id: SegmentId,
101 ) -> Result<SegmentMeta> {
102 let (meta, _stats) = self.merge_with_stats(dir, segments, new_segment_id).await?;
103 Ok(meta)
104 }
105
106 pub async fn merge_with_stats<D: Directory + DirectoryWriter>(
108 &self,
109 dir: &D,
110 segments: &[SegmentReader],
111 new_segment_id: SegmentId,
112 ) -> Result<(SegmentMeta, MergeStats)> {
113 let can_stack_stores = segments.iter().all(|s| !s.store_has_dict());
115
116 if can_stack_stores {
117 self.merge_optimized_with_stats(dir, segments, new_segment_id)
118 .await
119 } else {
120 self.merge_rebuild_with_stats(dir, segments, new_segment_id)
121 .await
122 }
123 }
124
125 async fn merge_optimized_with_stats<D: Directory + DirectoryWriter>(
127 &self,
128 dir: &D,
129 segments: &[SegmentReader],
130 new_segment_id: SegmentId,
131 ) -> Result<(SegmentMeta, MergeStats)> {
132 let mut stats = MergeStats::default();
133 let files = SegmentFiles::new(new_segment_id.0);
134
135 let mut term_dict_data = Vec::new();
137 let mut postings_data = Vec::new();
138 let terms_processed = self
139 .merge_postings_with_stats(
140 segments,
141 &mut term_dict_data,
142 &mut postings_data,
143 &mut stats,
144 )
145 .await?;
146 stats.terms_processed = terms_processed;
147 stats.term_dict_bytes = term_dict_data.len();
148 stats.postings_bytes = postings_data.len();
149
150 let current_mem = term_dict_data.capacity() + postings_data.capacity();
152 stats.current_memory_bytes = current_mem;
153 stats.peak_memory_bytes = stats.peak_memory_bytes.max(current_mem);
154
155 let mut store_data = Vec::new();
157 {
158 let mut store_merger = StoreMerger::new(&mut store_data);
159 for segment in segments {
160 let raw_blocks = segment.store_raw_blocks();
161 let data_slice = segment.store_data_slice();
162 store_merger.append_store(data_slice, &raw_blocks).await?;
163 }
164 store_merger.finish()?;
165 }
166 stats.store_bytes = store_data.len();
167
168 let current_mem =
170 term_dict_data.capacity() + postings_data.capacity() + store_data.capacity();
171 stats.peak_memory_bytes = stats.peak_memory_bytes.max(current_mem);
172
173 dir.write(&files.term_dict, &term_dict_data).await?;
175 dir.write(&files.postings, &postings_data).await?;
176 dir.write(&files.store, &store_data).await?;
177
178 drop(term_dict_data);
180 drop(postings_data);
181 drop(store_data);
182
183 let vectors_bytes = self
185 .merge_dense_vectors_with_stats(dir, segments, &files)
186 .await?;
187 stats.vectors_bytes = vectors_bytes;
188
189 let mut merged_field_stats: FxHashMap<u32, FieldStats> = FxHashMap::default();
191 for segment in segments {
192 for (&field_id, field_stats) in &segment.meta().field_stats {
193 let entry = merged_field_stats.entry(field_id).or_default();
194 entry.total_tokens += field_stats.total_tokens;
195 entry.doc_count += field_stats.doc_count;
196 }
197 }
198
199 let total_docs: u32 = segments.iter().map(|s| s.num_docs()).sum();
200 let meta = SegmentMeta {
201 id: new_segment_id.0,
202 num_docs: total_docs,
203 field_stats: merged_field_stats,
204 };
205
206 dir.write(&files.meta, &meta.serialize()?).await?;
207
208 log::info!(
209 "Merge complete: {} terms, output: term_dict={}, postings={}, store={}, vectors={}",
210 stats.terms_processed,
211 MergeStats::format_memory(stats.term_dict_bytes),
212 MergeStats::format_memory(stats.postings_bytes),
213 MergeStats::format_memory(stats.store_bytes),
214 MergeStats::format_memory(stats.vectors_bytes),
215 );
216
217 Ok((meta, stats))
218 }
219
220 async fn merge_rebuild_with_stats<D: Directory + DirectoryWriter>(
222 &self,
223 dir: &D,
224 segments: &[SegmentReader],
225 new_segment_id: SegmentId,
226 ) -> Result<(SegmentMeta, MergeStats)> {
227 let mut stats = MergeStats::default();
228
229 let mut builder =
230 SegmentBuilder::new((*self.schema).clone(), SegmentBuilderConfig::default())?;
231
232 for segment in segments {
233 for doc_id in 0..segment.num_docs() {
234 if let Some(doc) = segment.doc(doc_id).await? {
235 builder.add_document(doc)?;
236 }
237
238 if doc_id % 10000 == 0 {
240 let builder_stats = builder.stats();
241 stats.current_memory_bytes = builder_stats.estimated_memory_bytes;
242 stats.peak_memory_bytes =
243 stats.peak_memory_bytes.max(stats.current_memory_bytes);
244 }
245 }
246 }
247
248 let meta = builder.build(dir, new_segment_id).await?;
249 Ok((meta, stats))
250 }
251
252 async fn merge_postings_with_stats(
264 &self,
265 segments: &[SegmentReader],
266 term_dict: &mut Vec<u8>,
267 postings_out: &mut Vec<u8>,
268 stats: &mut MergeStats,
269 ) -> Result<usize> {
270 let mut doc_offsets = Vec::with_capacity(segments.len());
272 let mut offset = 0u32;
273 for segment in segments {
274 doc_offsets.push(offset);
275 offset += segment.num_docs();
276 }
277
278 let mut iterators: Vec<_> = segments.iter().map(|s| s.term_dict_iter()).collect();
280
281 let mut heap: BinaryHeap<MergeEntry> = BinaryHeap::new();
283 for (seg_idx, iter) in iterators.iter_mut().enumerate() {
284 if let Some((key, term_info)) = iter.next().await.map_err(crate::Error::from)? {
285 heap.push(MergeEntry {
286 key,
287 term_info,
288 segment_idx: seg_idx,
289 doc_offset: doc_offsets[seg_idx],
290 });
291 }
292 }
293
294 let mut term_results: Vec<(Vec<u8>, TermInfo)> = Vec::new();
296 let mut terms_processed = 0usize;
297
298 while !heap.is_empty() {
299 let first = heap.pop().unwrap();
301 let current_key = first.key.clone();
302
303 let mut sources: Vec<(usize, TermInfo, u32)> =
305 vec![(first.segment_idx, first.term_info, first.doc_offset)];
306
307 if let Some((key, term_info)) = iterators[first.segment_idx]
309 .next()
310 .await
311 .map_err(crate::Error::from)?
312 {
313 heap.push(MergeEntry {
314 key,
315 term_info,
316 segment_idx: first.segment_idx,
317 doc_offset: doc_offsets[first.segment_idx],
318 });
319 }
320
321 while let Some(entry) = heap.peek() {
323 if entry.key != current_key {
324 break;
325 }
326 let entry = heap.pop().unwrap();
327 sources.push((entry.segment_idx, entry.term_info, entry.doc_offset));
328
329 if let Some((key, term_info)) = iterators[entry.segment_idx]
331 .next()
332 .await
333 .map_err(crate::Error::from)?
334 {
335 heap.push(MergeEntry {
336 key,
337 term_info,
338 segment_idx: entry.segment_idx,
339 doc_offset: doc_offsets[entry.segment_idx],
340 });
341 }
342 }
343
344 let term_info = if sources.len() == 1 {
346 let (seg_idx, source_info, seg_doc_offset) = &sources[0];
348 self.copy_term_posting(
349 &segments[*seg_idx],
350 source_info,
351 *seg_doc_offset,
352 postings_out,
353 )
354 .await?
355 } else {
356 self.merge_term_postings(segments, &sources, postings_out)
358 .await?
359 };
360
361 term_results.push((current_key, term_info));
362 terms_processed += 1;
363
364 if terms_processed.is_multiple_of(100_000) {
366 log::debug!("Merge progress: {} terms processed", terms_processed);
367 }
368 }
369
370 log::info!(
371 "Merge complete: {} terms processed from {} segments",
372 terms_processed,
373 segments.len()
374 );
375
376 let results_mem = term_results.capacity() * std::mem::size_of::<(Vec<u8>, TermInfo)>();
378 stats.current_memory_bytes = results_mem + postings_out.capacity();
379 stats.peak_memory_bytes = stats.peak_memory_bytes.max(stats.current_memory_bytes);
380
381 let mut writer = SSTableWriter::<TermInfo>::new(term_dict);
383 for (key, term_info) in term_results {
384 writer.insert(&key, &term_info)?;
385 }
386 writer.finish()?;
387
388 Ok(terms_processed)
389 }
390
391 async fn copy_term_posting(
394 &self,
395 segment: &SegmentReader,
396 source_info: &TermInfo,
397 doc_offset: u32,
398 postings_out: &mut Vec<u8>,
399 ) -> Result<TermInfo> {
400 if let Some((doc_ids, term_freqs)) = source_info.decode_inline() {
402 let remapped_ids: Vec<u32> = doc_ids.iter().map(|&id| id + doc_offset).collect();
403 if let Some(inline) = TermInfo::try_inline(&remapped_ids, &term_freqs) {
404 return Ok(inline);
405 }
406 let mut pl = PostingList::with_capacity(remapped_ids.len());
408 for (doc_id, tf) in remapped_ids.into_iter().zip(term_freqs.into_iter()) {
409 pl.push(doc_id, tf);
410 }
411 let posting_offset = postings_out.len() as u64;
412 let block_list = BlockPostingList::from_posting_list(&pl)?;
413 let mut encoded = Vec::new();
414 block_list.serialize(&mut encoded)?;
415 postings_out.extend_from_slice(&encoded);
416 return Ok(TermInfo::external(
417 posting_offset,
418 encoded.len() as u32,
419 pl.doc_count(),
420 ));
421 }
422
423 let (offset, len) = source_info.external_info().unwrap();
426 let posting_bytes = segment.read_postings(offset, len).await?;
427 let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
428
429 let mut remapped = PostingList::with_capacity(source_postings.doc_count() as usize);
431 let mut iter = source_postings.iterator();
432 while iter.doc() != TERMINATED {
433 remapped.add(iter.doc() + doc_offset, iter.term_freq());
434 iter.advance();
435 }
436
437 let doc_ids: Vec<u32> = remapped.iter().map(|p| p.doc_id).collect();
439 let term_freqs: Vec<u32> = remapped.iter().map(|p| p.term_freq).collect();
440
441 if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
442 return Ok(inline);
443 }
444
445 let posting_offset = postings_out.len() as u64;
447 let block_list = BlockPostingList::from_posting_list(&remapped)?;
448 let mut encoded = Vec::new();
449 block_list.serialize(&mut encoded)?;
450 postings_out.extend_from_slice(&encoded);
451
452 Ok(TermInfo::external(
453 posting_offset,
454 encoded.len() as u32,
455 remapped.doc_count(),
456 ))
457 }
458
459 async fn merge_term_postings(
462 &self,
463 segments: &[SegmentReader],
464 sources: &[(usize, TermInfo, u32)],
465 postings_out: &mut Vec<u8>,
466 ) -> Result<TermInfo> {
467 let mut sorted_sources: Vec<_> = sources.to_vec();
469 sorted_sources.sort_by_key(|(_, _, doc_offset)| *doc_offset);
470
471 let all_external = sorted_sources
473 .iter()
474 .all(|(_, term_info, _)| term_info.external_info().is_some());
475
476 if all_external && sorted_sources.len() > 1 {
477 let mut block_sources = Vec::with_capacity(sorted_sources.len());
479
480 for (seg_idx, term_info, doc_offset) in &sorted_sources {
481 let segment = &segments[*seg_idx];
482 let (offset, len) = term_info.external_info().unwrap();
483 let posting_bytes = segment.read_postings(offset, len).await?;
484 let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
485 block_sources.push((source_postings, *doc_offset));
486 }
487
488 let merged_blocks = BlockPostingList::concatenate_blocks(&block_sources)?;
489 let posting_offset = postings_out.len() as u64;
490 let mut encoded = Vec::new();
491 merged_blocks.serialize(&mut encoded)?;
492 postings_out.extend_from_slice(&encoded);
493
494 return Ok(TermInfo::external(
495 posting_offset,
496 encoded.len() as u32,
497 merged_blocks.doc_count(),
498 ));
499 }
500
501 let mut merged = PostingList::new();
503
504 for (seg_idx, term_info, doc_offset) in &sorted_sources {
505 let segment = &segments[*seg_idx];
506
507 if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
508 for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
510 merged.add(doc_id + doc_offset, tf);
511 }
512 } else {
513 let (offset, len) = term_info.external_info().unwrap();
515 let posting_bytes = segment.read_postings(offset, len).await?;
516 let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
517
518 let mut iter = source_postings.iterator();
519 while iter.doc() != TERMINATED {
520 merged.add(iter.doc() + doc_offset, iter.term_freq());
521 iter.advance();
522 }
523 }
524 }
525
526 let doc_ids: Vec<u32> = merged.iter().map(|p| p.doc_id).collect();
528 let term_freqs: Vec<u32> = merged.iter().map(|p| p.term_freq).collect();
529
530 if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
531 return Ok(inline);
532 }
533
534 let posting_offset = postings_out.len() as u64;
536 let block_list = BlockPostingList::from_posting_list(&merged)?;
537 let mut encoded = Vec::new();
538 block_list.serialize(&mut encoded)?;
539 postings_out.extend_from_slice(&encoded);
540
541 Ok(TermInfo::external(
542 posting_offset,
543 encoded.len() as u32,
544 merged.doc_count(),
545 ))
546 }
547 async fn merge_dense_vectors_with_stats<D: Directory + DirectoryWriter>(
553 &self,
554 dir: &D,
555 segments: &[SegmentReader],
556 files: &SegmentFiles,
557 ) -> Result<usize> {
558 use byteorder::{LittleEndian, WriteBytesExt};
559
560 let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
562
563 for (field, entry) in self.schema.fields() {
564 if !matches!(entry.field_type, FieldType::DenseVector) {
565 continue;
566 }
567
568 let scann_indexes: Vec<_> = segments
570 .iter()
571 .filter_map(|s| s.get_scann_vector_index(field))
572 .collect();
573
574 if scann_indexes.len()
575 == segments
576 .iter()
577 .filter(|s| s.has_dense_vector_index(field))
578 .count()
579 && !scann_indexes.is_empty()
580 {
581 let refs: Vec<&crate::structures::IVFPQIndex> =
583 scann_indexes.iter().map(|(idx, _)| idx.as_ref()).collect();
584
585 let mut doc_offsets = Vec::with_capacity(segments.len());
587 let mut offset = 0u32;
588 for segment in segments {
589 doc_offsets.push(offset);
590 offset += segment.num_docs();
591 }
592
593 match crate::structures::IVFPQIndex::merge(&refs, &doc_offsets) {
594 Ok(merged) => {
595 let bytes = merged
596 .to_bytes()
597 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
598 field_indexes.push((field.0, 2u8, bytes)); continue;
600 }
601 Err(e) => {
602 log::warn!("ScaNN merge failed: {}, falling back to IVF", e);
603 }
604 }
605 }
606
607 let ivf_indexes: Vec<_> = segments
609 .iter()
610 .filter_map(|s| s.get_ivf_vector_index(field))
611 .collect();
612
613 if ivf_indexes.len()
614 == segments
615 .iter()
616 .filter(|s| s.has_dense_vector_index(field))
617 .count()
618 && !ivf_indexes.is_empty()
619 {
620 let refs: Vec<&crate::structures::IVFRaBitQIndex> =
622 ivf_indexes.iter().map(|arc| arc.as_ref()).collect();
623
624 let mut doc_offsets = Vec::with_capacity(segments.len());
626 let mut offset = 0u32;
627 for segment in segments {
628 doc_offsets.push(offset);
629 offset += segment.num_docs();
630 }
631
632 match crate::structures::IVFRaBitQIndex::merge(&refs, &doc_offsets) {
633 Ok(merged) => {
634 let bytes = merged
635 .to_bytes()
636 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
637 field_indexes.push((field.0, 1u8, bytes)); continue;
639 }
640 Err(e) => {
641 log::warn!("IVF merge failed: {}, falling back to rebuild", e);
642 }
643 }
644 }
645
646 let mut all_vectors: Vec<Vec<f32>> = Vec::new();
648
649 for segment in segments {
650 if let Some(index) = segment.get_dense_vector_index(field)
651 && let Some(raw_vecs) = &index.raw_vectors
652 {
653 all_vectors.extend(raw_vecs.iter().cloned());
654 }
655 }
656
657 if !all_vectors.is_empty() {
658 let dim = all_vectors[0].len();
659 let config = RaBitQConfig::new(dim);
660 let merged_index = RaBitQIndex::build(config, &all_vectors, true);
661
662 let index_bytes = serde_json::to_vec(&merged_index)
663 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
664
665 field_indexes.push((field.0, 0u8, index_bytes)); }
667 }
668
669 if !field_indexes.is_empty() {
671 field_indexes.sort_by_key(|(id, _, _)| *id);
672
673 let header_size = 4 + field_indexes.len() * (4 + 1 + 8 + 8);
675 let mut output = Vec::new();
676
677 output.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
678
679 let mut current_offset = header_size as u64;
680 for (field_id, index_type, data) in &field_indexes {
681 output.write_u32::<LittleEndian>(*field_id)?;
682 output.write_u8(*index_type)?;
683 output.write_u64::<LittleEndian>(current_offset)?;
684 output.write_u64::<LittleEndian>(data.len() as u64)?;
685 current_offset += data.len() as u64;
686 }
687
688 for (_, _, data) in field_indexes {
689 output.extend_from_slice(&data);
690 }
691
692 let output_size = output.len();
693 dir.write(&files.vectors, &output).await?;
694 return Ok(output_size);
695 }
696
697 Ok(0)
698 }
699}
700
701pub async fn delete_segment<D: Directory + DirectoryWriter>(
703 dir: &D,
704 segment_id: SegmentId,
705) -> Result<()> {
706 let files = SegmentFiles::new(segment_id.0);
707 let _ = dir.delete(&files.term_dict).await;
708 let _ = dir.delete(&files.postings).await;
709 let _ = dir.delete(&files.store).await;
710 let _ = dir.delete(&files.meta).await;
711 let _ = dir.delete(&files.vectors).await;
712 Ok(())
713}