1use std::cmp::Ordering;
4use std::collections::BinaryHeap;
5use std::sync::Arc;
6
7use rustc_hash::FxHashMap;
8
9use super::builder::{SegmentBuilder, SegmentBuilderConfig};
10use super::reader::SegmentReader;
11use super::store::StoreMerger;
12use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
13use crate::Result;
14use crate::directories::{Directory, DirectoryWriter};
15use crate::dsl::{FieldType, Schema};
16use crate::structures::{
17 BlockPostingList, PostingList, RaBitQConfig, RaBitQIndex, SSTableWriter, TERMINATED, TermInfo,
18};
19
20#[derive(Debug, Clone, Default)]
22pub struct MergeStats {
23 pub terms_processed: usize,
25 pub postings_merged: usize,
27 pub peak_memory_bytes: usize,
29 pub current_memory_bytes: usize,
31 pub term_dict_bytes: usize,
33 pub postings_bytes: usize,
35 pub store_bytes: usize,
37 pub vectors_bytes: usize,
39}
40
41impl MergeStats {
42 pub fn format_memory(bytes: usize) -> String {
44 if bytes >= 1024 * 1024 * 1024 {
45 format!("{:.2} GB", bytes as f64 / (1024.0 * 1024.0 * 1024.0))
46 } else if bytes >= 1024 * 1024 {
47 format!("{:.2} MB", bytes as f64 / (1024.0 * 1024.0))
48 } else if bytes >= 1024 {
49 format!("{:.2} KB", bytes as f64 / 1024.0)
50 } else {
51 format!("{} B", bytes)
52 }
53 }
54}
55
56struct MergeEntry {
58 key: Vec<u8>,
59 term_info: TermInfo,
60 segment_idx: usize,
61 doc_offset: u32,
62}
63
64impl PartialEq for MergeEntry {
65 fn eq(&self, other: &Self) -> bool {
66 self.key == other.key
67 }
68}
69
70impl Eq for MergeEntry {}
71
72impl PartialOrd for MergeEntry {
73 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
74 Some(self.cmp(other))
75 }
76}
77
78impl Ord for MergeEntry {
79 fn cmp(&self, other: &Self) -> Ordering {
80 other.key.cmp(&self.key)
82 }
83}
84
85pub struct SegmentMerger {
87 schema: Arc<Schema>,
88}
89
90impl SegmentMerger {
91 pub fn new(schema: Arc<Schema>) -> Self {
92 Self { schema }
93 }
94
95 pub async fn merge<D: Directory + DirectoryWriter>(
97 &self,
98 dir: &D,
99 segments: &[SegmentReader],
100 new_segment_id: SegmentId,
101 ) -> Result<SegmentMeta> {
102 let (meta, _stats) = self.merge_with_stats(dir, segments, new_segment_id).await?;
103 Ok(meta)
104 }
105
106 pub async fn merge_with_stats<D: Directory + DirectoryWriter>(
108 &self,
109 dir: &D,
110 segments: &[SegmentReader],
111 new_segment_id: SegmentId,
112 ) -> Result<(SegmentMeta, MergeStats)> {
113 let can_stack_stores = segments.iter().all(|s| !s.store_has_dict());
115
116 let has_positions = self
119 .schema
120 .fields()
121 .any(|(_, entry)| entry.positions.is_some());
122
123 if can_stack_stores && !has_positions {
124 self.merge_optimized_with_stats(dir, segments, new_segment_id)
125 .await
126 } else {
127 self.merge_rebuild_with_stats(dir, segments, new_segment_id)
128 .await
129 }
130 }
131
132 async fn merge_optimized_with_stats<D: Directory + DirectoryWriter>(
134 &self,
135 dir: &D,
136 segments: &[SegmentReader],
137 new_segment_id: SegmentId,
138 ) -> Result<(SegmentMeta, MergeStats)> {
139 let mut stats = MergeStats::default();
140 let files = SegmentFiles::new(new_segment_id.0);
141
142 let mut term_dict_data = Vec::new();
144 let mut postings_data = Vec::new();
145 let terms_processed = self
146 .merge_postings_with_stats(
147 segments,
148 &mut term_dict_data,
149 &mut postings_data,
150 &mut stats,
151 )
152 .await?;
153 stats.terms_processed = terms_processed;
154 stats.term_dict_bytes = term_dict_data.len();
155 stats.postings_bytes = postings_data.len();
156
157 let current_mem = term_dict_data.capacity() + postings_data.capacity();
159 stats.current_memory_bytes = current_mem;
160 stats.peak_memory_bytes = stats.peak_memory_bytes.max(current_mem);
161
162 let mut store_data = Vec::new();
164 {
165 let mut store_merger = StoreMerger::new(&mut store_data);
166 for segment in segments {
167 let raw_blocks = segment.store_raw_blocks();
168 let data_slice = segment.store_data_slice();
169 store_merger.append_store(data_slice, &raw_blocks).await?;
170 }
171 store_merger.finish()?;
172 }
173 stats.store_bytes = store_data.len();
174
175 let current_mem =
177 term_dict_data.capacity() + postings_data.capacity() + store_data.capacity();
178 stats.peak_memory_bytes = stats.peak_memory_bytes.max(current_mem);
179
180 dir.write(&files.term_dict, &term_dict_data).await?;
182 dir.write(&files.postings, &postings_data).await?;
183 dir.write(&files.store, &store_data).await?;
184
185 drop(term_dict_data);
187 drop(postings_data);
188 drop(store_data);
189
190 let vectors_bytes = self
192 .merge_dense_vectors_with_stats(dir, segments, &files)
193 .await?;
194 stats.vectors_bytes = vectors_bytes;
195
196 let mut merged_field_stats: FxHashMap<u32, FieldStats> = FxHashMap::default();
198 for segment in segments {
199 for (&field_id, field_stats) in &segment.meta().field_stats {
200 let entry = merged_field_stats.entry(field_id).or_default();
201 entry.total_tokens += field_stats.total_tokens;
202 entry.doc_count += field_stats.doc_count;
203 }
204 }
205
206 let total_docs: u32 = segments.iter().map(|s| s.num_docs()).sum();
207 let meta = SegmentMeta {
208 id: new_segment_id.0,
209 num_docs: total_docs,
210 field_stats: merged_field_stats,
211 };
212
213 dir.write(&files.meta, &meta.serialize()?).await?;
214
215 log::info!(
216 "Merge complete: {} terms, output: term_dict={}, postings={}, store={}, vectors={}",
217 stats.terms_processed,
218 MergeStats::format_memory(stats.term_dict_bytes),
219 MergeStats::format_memory(stats.postings_bytes),
220 MergeStats::format_memory(stats.store_bytes),
221 MergeStats::format_memory(stats.vectors_bytes),
222 );
223
224 Ok((meta, stats))
225 }
226
227 async fn merge_rebuild_with_stats<D: Directory + DirectoryWriter>(
229 &self,
230 dir: &D,
231 segments: &[SegmentReader],
232 new_segment_id: SegmentId,
233 ) -> Result<(SegmentMeta, MergeStats)> {
234 let mut stats = MergeStats::default();
235
236 let mut builder =
237 SegmentBuilder::new((*self.schema).clone(), SegmentBuilderConfig::default())?;
238
239 for segment in segments {
240 for doc_id in 0..segment.num_docs() {
241 if let Some(doc) = segment.doc(doc_id).await? {
242 builder.add_document(doc)?;
243 }
244
245 if doc_id % 10000 == 0 {
247 let builder_stats = builder.stats();
248 stats.current_memory_bytes = builder_stats.estimated_memory_bytes;
249 stats.peak_memory_bytes =
250 stats.peak_memory_bytes.max(stats.current_memory_bytes);
251 }
252 }
253 }
254
255 let meta = builder.build(dir, new_segment_id).await?;
256 Ok((meta, stats))
257 }
258
259 async fn merge_postings_with_stats(
271 &self,
272 segments: &[SegmentReader],
273 term_dict: &mut Vec<u8>,
274 postings_out: &mut Vec<u8>,
275 stats: &mut MergeStats,
276 ) -> Result<usize> {
277 let mut doc_offsets = Vec::with_capacity(segments.len());
279 let mut offset = 0u32;
280 for segment in segments {
281 doc_offsets.push(offset);
282 offset += segment.num_docs();
283 }
284
285 let mut iterators: Vec<_> = segments.iter().map(|s| s.term_dict_iter()).collect();
287
288 let mut heap: BinaryHeap<MergeEntry> = BinaryHeap::new();
290 for (seg_idx, iter) in iterators.iter_mut().enumerate() {
291 if let Some((key, term_info)) = iter.next().await.map_err(crate::Error::from)? {
292 heap.push(MergeEntry {
293 key,
294 term_info,
295 segment_idx: seg_idx,
296 doc_offset: doc_offsets[seg_idx],
297 });
298 }
299 }
300
301 let mut term_results: Vec<(Vec<u8>, TermInfo)> = Vec::new();
303 let mut terms_processed = 0usize;
304
305 while !heap.is_empty() {
306 let first = heap.pop().unwrap();
308 let current_key = first.key.clone();
309
310 let mut sources: Vec<(usize, TermInfo, u32)> =
312 vec![(first.segment_idx, first.term_info, first.doc_offset)];
313
314 if let Some((key, term_info)) = iterators[first.segment_idx]
316 .next()
317 .await
318 .map_err(crate::Error::from)?
319 {
320 heap.push(MergeEntry {
321 key,
322 term_info,
323 segment_idx: first.segment_idx,
324 doc_offset: doc_offsets[first.segment_idx],
325 });
326 }
327
328 while let Some(entry) = heap.peek() {
330 if entry.key != current_key {
331 break;
332 }
333 let entry = heap.pop().unwrap();
334 sources.push((entry.segment_idx, entry.term_info, entry.doc_offset));
335
336 if let Some((key, term_info)) = iterators[entry.segment_idx]
338 .next()
339 .await
340 .map_err(crate::Error::from)?
341 {
342 heap.push(MergeEntry {
343 key,
344 term_info,
345 segment_idx: entry.segment_idx,
346 doc_offset: doc_offsets[entry.segment_idx],
347 });
348 }
349 }
350
351 let term_info = if sources.len() == 1 {
353 let (seg_idx, source_info, seg_doc_offset) = &sources[0];
355 self.copy_term_posting(
356 &segments[*seg_idx],
357 source_info,
358 *seg_doc_offset,
359 postings_out,
360 )
361 .await?
362 } else {
363 self.merge_term_postings(segments, &sources, postings_out)
365 .await?
366 };
367
368 term_results.push((current_key, term_info));
369 terms_processed += 1;
370
371 if terms_processed.is_multiple_of(100_000) {
373 log::debug!("Merge progress: {} terms processed", terms_processed);
374 }
375 }
376
377 log::info!(
378 "Merge complete: {} terms processed from {} segments",
379 terms_processed,
380 segments.len()
381 );
382
383 let results_mem = term_results.capacity() * std::mem::size_of::<(Vec<u8>, TermInfo)>();
385 stats.current_memory_bytes = results_mem + postings_out.capacity();
386 stats.peak_memory_bytes = stats.peak_memory_bytes.max(stats.current_memory_bytes);
387
388 let mut writer = SSTableWriter::<TermInfo>::new(term_dict);
390 for (key, term_info) in term_results {
391 writer.insert(&key, &term_info)?;
392 }
393 writer.finish()?;
394
395 Ok(terms_processed)
396 }
397
398 async fn copy_term_posting(
401 &self,
402 segment: &SegmentReader,
403 source_info: &TermInfo,
404 doc_offset: u32,
405 postings_out: &mut Vec<u8>,
406 ) -> Result<TermInfo> {
407 if let Some((doc_ids, term_freqs)) = source_info.decode_inline() {
409 let remapped_ids: Vec<u32> = doc_ids.iter().map(|&id| id + doc_offset).collect();
410 if let Some(inline) = TermInfo::try_inline(&remapped_ids, &term_freqs) {
411 return Ok(inline);
412 }
413 let mut pl = PostingList::with_capacity(remapped_ids.len());
415 for (doc_id, tf) in remapped_ids.into_iter().zip(term_freqs.into_iter()) {
416 pl.push(doc_id, tf);
417 }
418 let posting_offset = postings_out.len() as u64;
419 let block_list = BlockPostingList::from_posting_list(&pl)?;
420 let mut encoded = Vec::new();
421 block_list.serialize(&mut encoded)?;
422 postings_out.extend_from_slice(&encoded);
423 return Ok(TermInfo::external(
424 posting_offset,
425 encoded.len() as u32,
426 pl.doc_count(),
427 ));
428 }
429
430 let (offset, len) = source_info.external_info().unwrap();
433 let posting_bytes = segment.read_postings(offset, len).await?;
434 let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
435
436 let mut remapped = PostingList::with_capacity(source_postings.doc_count() as usize);
438 let mut iter = source_postings.iterator();
439 while iter.doc() != TERMINATED {
440 remapped.add(iter.doc() + doc_offset, iter.term_freq());
441 iter.advance();
442 }
443
444 let doc_ids: Vec<u32> = remapped.iter().map(|p| p.doc_id).collect();
446 let term_freqs: Vec<u32> = remapped.iter().map(|p| p.term_freq).collect();
447
448 if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
449 return Ok(inline);
450 }
451
452 let posting_offset = postings_out.len() as u64;
454 let block_list = BlockPostingList::from_posting_list(&remapped)?;
455 let mut encoded = Vec::new();
456 block_list.serialize(&mut encoded)?;
457 postings_out.extend_from_slice(&encoded);
458
459 Ok(TermInfo::external(
460 posting_offset,
461 encoded.len() as u32,
462 remapped.doc_count(),
463 ))
464 }
465
466 async fn merge_term_postings(
469 &self,
470 segments: &[SegmentReader],
471 sources: &[(usize, TermInfo, u32)],
472 postings_out: &mut Vec<u8>,
473 ) -> Result<TermInfo> {
474 let mut sorted_sources: Vec<_> = sources.to_vec();
476 sorted_sources.sort_by_key(|(_, _, doc_offset)| *doc_offset);
477
478 let all_external = sorted_sources
480 .iter()
481 .all(|(_, term_info, _)| term_info.external_info().is_some());
482
483 if all_external && sorted_sources.len() > 1 {
484 let mut block_sources = Vec::with_capacity(sorted_sources.len());
486
487 for (seg_idx, term_info, doc_offset) in &sorted_sources {
488 let segment = &segments[*seg_idx];
489 let (offset, len) = term_info.external_info().unwrap();
490 let posting_bytes = segment.read_postings(offset, len).await?;
491 let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
492 block_sources.push((source_postings, *doc_offset));
493 }
494
495 let merged_blocks = BlockPostingList::concatenate_blocks(&block_sources)?;
496 let posting_offset = postings_out.len() as u64;
497 let mut encoded = Vec::new();
498 merged_blocks.serialize(&mut encoded)?;
499 postings_out.extend_from_slice(&encoded);
500
501 return Ok(TermInfo::external(
502 posting_offset,
503 encoded.len() as u32,
504 merged_blocks.doc_count(),
505 ));
506 }
507
508 let mut merged = PostingList::new();
510
511 for (seg_idx, term_info, doc_offset) in &sorted_sources {
512 let segment = &segments[*seg_idx];
513
514 if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
515 for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
517 merged.add(doc_id + doc_offset, tf);
518 }
519 } else {
520 let (offset, len) = term_info.external_info().unwrap();
522 let posting_bytes = segment.read_postings(offset, len).await?;
523 let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
524
525 let mut iter = source_postings.iterator();
526 while iter.doc() != TERMINATED {
527 merged.add(iter.doc() + doc_offset, iter.term_freq());
528 iter.advance();
529 }
530 }
531 }
532
533 let doc_ids: Vec<u32> = merged.iter().map(|p| p.doc_id).collect();
535 let term_freqs: Vec<u32> = merged.iter().map(|p| p.term_freq).collect();
536
537 if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
538 return Ok(inline);
539 }
540
541 let posting_offset = postings_out.len() as u64;
543 let block_list = BlockPostingList::from_posting_list(&merged)?;
544 let mut encoded = Vec::new();
545 block_list.serialize(&mut encoded)?;
546 postings_out.extend_from_slice(&encoded);
547
548 Ok(TermInfo::external(
549 posting_offset,
550 encoded.len() as u32,
551 merged.doc_count(),
552 ))
553 }
554 async fn merge_dense_vectors_with_stats<D: Directory + DirectoryWriter>(
560 &self,
561 dir: &D,
562 segments: &[SegmentReader],
563 files: &SegmentFiles,
564 ) -> Result<usize> {
565 use byteorder::{LittleEndian, WriteBytesExt};
566
567 let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
569
570 for (field, entry) in self.schema.fields() {
571 if !matches!(entry.field_type, FieldType::DenseVector) {
572 continue;
573 }
574
575 let scann_indexes: Vec<_> = segments
577 .iter()
578 .filter_map(|s| s.get_scann_vector_index(field))
579 .collect();
580
581 if scann_indexes.len()
582 == segments
583 .iter()
584 .filter(|s| s.has_dense_vector_index(field))
585 .count()
586 && !scann_indexes.is_empty()
587 {
588 let refs: Vec<&crate::structures::IVFPQIndex> =
590 scann_indexes.iter().map(|(idx, _)| idx.as_ref()).collect();
591
592 let mut doc_offsets = Vec::with_capacity(segments.len());
594 let mut offset = 0u32;
595 for segment in segments {
596 doc_offsets.push(offset);
597 offset += segment.num_docs();
598 }
599
600 match crate::structures::IVFPQIndex::merge(&refs, &doc_offsets) {
601 Ok(merged) => {
602 let bytes = merged
603 .to_bytes()
604 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
605 field_indexes.push((field.0, 2u8, bytes)); continue;
607 }
608 Err(e) => {
609 log::warn!("ScaNN merge failed: {}, falling back to IVF", e);
610 }
611 }
612 }
613
614 let ivf_indexes: Vec<_> = segments
616 .iter()
617 .filter_map(|s| s.get_ivf_vector_index(field))
618 .collect();
619
620 if ivf_indexes.len()
621 == segments
622 .iter()
623 .filter(|s| s.has_dense_vector_index(field))
624 .count()
625 && !ivf_indexes.is_empty()
626 {
627 let refs: Vec<&crate::structures::IVFRaBitQIndex> =
629 ivf_indexes.iter().map(|arc| arc.as_ref()).collect();
630
631 let mut doc_offsets = Vec::with_capacity(segments.len());
633 let mut offset = 0u32;
634 for segment in segments {
635 doc_offsets.push(offset);
636 offset += segment.num_docs();
637 }
638
639 match crate::structures::IVFRaBitQIndex::merge(&refs, &doc_offsets) {
640 Ok(merged) => {
641 let bytes = merged
642 .to_bytes()
643 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
644 field_indexes.push((field.0, 1u8, bytes)); continue;
646 }
647 Err(e) => {
648 log::warn!("IVF merge failed: {}, falling back to rebuild", e);
649 }
650 }
651 }
652
653 let mut all_vectors: Vec<Vec<f32>> = Vec::new();
655
656 for segment in segments {
657 if let Some(index) = segment.get_dense_vector_index(field)
658 && let Some(raw_vecs) = &index.raw_vectors
659 {
660 all_vectors.extend(raw_vecs.iter().cloned());
661 }
662 }
663
664 if !all_vectors.is_empty() {
665 let dim = all_vectors[0].len();
666 let config = RaBitQConfig::new(dim);
667 let merged_index = RaBitQIndex::build(config, &all_vectors, true);
668
669 let index_bytes = serde_json::to_vec(&merged_index)
670 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
671
672 field_indexes.push((field.0, 0u8, index_bytes)); }
674 }
675
676 if !field_indexes.is_empty() {
678 field_indexes.sort_by_key(|(id, _, _)| *id);
679
680 let header_size = 4 + field_indexes.len() * (4 + 1 + 8 + 8);
682 let mut output = Vec::new();
683
684 output.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
685
686 let mut current_offset = header_size as u64;
687 for (field_id, index_type, data) in &field_indexes {
688 output.write_u32::<LittleEndian>(*field_id)?;
689 output.write_u8(*index_type)?;
690 output.write_u64::<LittleEndian>(current_offset)?;
691 output.write_u64::<LittleEndian>(data.len() as u64)?;
692 current_offset += data.len() as u64;
693 }
694
695 for (_, _, data) in field_indexes {
696 output.extend_from_slice(&data);
697 }
698
699 let output_size = output.len();
700 dir.write(&files.vectors, &output).await?;
701 return Ok(output_size);
702 }
703
704 Ok(0)
705 }
706}
707
708pub async fn delete_segment<D: Directory + DirectoryWriter>(
710 dir: &D,
711 segment_id: SegmentId,
712) -> Result<()> {
713 let files = SegmentFiles::new(segment_id.0);
714 let _ = dir.delete(&files.term_dict).await;
715 let _ = dir.delete(&files.postings).await;
716 let _ = dir.delete(&files.store).await;
717 let _ = dir.delete(&files.meta).await;
718 let _ = dir.delete(&files.vectors).await;
719 Ok(())
720}