1use std::cmp::Ordering;
4use std::collections::BinaryHeap;
5use std::sync::Arc;
6
7use rustc_hash::FxHashMap;
8
9use super::builder::{SegmentBuilder, SegmentBuilderConfig};
10use super::reader::SegmentReader;
11use super::store::StoreMerger;
12use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
13use crate::Result;
14use crate::directories::{Directory, DirectoryWriter};
15use crate::dsl::{FieldType, Schema};
16use crate::structures::{
17 BlockPostingList, PostingList, RaBitQConfig, RaBitQIndex, SSTableWriter, TERMINATED, TermInfo,
18};
19
20#[derive(Debug, Clone, Default)]
22pub struct MergeStats {
23 pub terms_processed: usize,
25 pub postings_merged: usize,
27 pub peak_memory_bytes: usize,
29 pub current_memory_bytes: usize,
31 pub term_dict_bytes: usize,
33 pub postings_bytes: usize,
35 pub store_bytes: usize,
37 pub vectors_bytes: usize,
39 pub sparse_bytes: usize,
41}
42
43impl MergeStats {
44 pub fn format_memory(bytes: usize) -> String {
46 if bytes >= 1024 * 1024 * 1024 {
47 format!("{:.2} GB", bytes as f64 / (1024.0 * 1024.0 * 1024.0))
48 } else if bytes >= 1024 * 1024 {
49 format!("{:.2} MB", bytes as f64 / (1024.0 * 1024.0))
50 } else if bytes >= 1024 {
51 format!("{:.2} KB", bytes as f64 / 1024.0)
52 } else {
53 format!("{} B", bytes)
54 }
55 }
56}
57
58struct MergeEntry {
60 key: Vec<u8>,
61 term_info: TermInfo,
62 segment_idx: usize,
63 doc_offset: u32,
64}
65
66impl PartialEq for MergeEntry {
67 fn eq(&self, other: &Self) -> bool {
68 self.key == other.key
69 }
70}
71
72impl Eq for MergeEntry {}
73
74impl PartialOrd for MergeEntry {
75 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
76 Some(self.cmp(other))
77 }
78}
79
80impl Ord for MergeEntry {
81 fn cmp(&self, other: &Self) -> Ordering {
82 other.key.cmp(&self.key)
84 }
85}
86
87pub struct SegmentMerger {
89 schema: Arc<Schema>,
90}
91
92impl SegmentMerger {
93 pub fn new(schema: Arc<Schema>) -> Self {
94 Self { schema }
95 }
96
97 pub async fn merge<D: Directory + DirectoryWriter>(
99 &self,
100 dir: &D,
101 segments: &[SegmentReader],
102 new_segment_id: SegmentId,
103 ) -> Result<SegmentMeta> {
104 let (meta, _stats) = self.merge_with_stats(dir, segments, new_segment_id).await?;
105 Ok(meta)
106 }
107
108 pub async fn merge_with_stats<D: Directory + DirectoryWriter>(
110 &self,
111 dir: &D,
112 segments: &[SegmentReader],
113 new_segment_id: SegmentId,
114 ) -> Result<(SegmentMeta, MergeStats)> {
115 let can_stack_stores = segments.iter().all(|s| !s.store_has_dict());
117
118 let has_positions = self
121 .schema
122 .fields()
123 .any(|(_, entry)| entry.positions.is_some());
124
125 if can_stack_stores && !has_positions {
127 self.merge_optimized_with_stats(dir, segments, new_segment_id)
128 .await
129 } else {
130 self.merge_rebuild_with_stats(dir, segments, new_segment_id)
131 .await
132 }
133 }
134
135 async fn merge_optimized_with_stats<D: Directory + DirectoryWriter>(
137 &self,
138 dir: &D,
139 segments: &[SegmentReader],
140 new_segment_id: SegmentId,
141 ) -> Result<(SegmentMeta, MergeStats)> {
142 self.merge_core(
143 dir,
144 segments,
145 new_segment_id,
146 DenseVectorStrategy::MergeExisting,
147 )
148 .await
149 }
150
151 async fn merge_core<D: Directory + DirectoryWriter>(
154 &self,
155 dir: &D,
156 segments: &[SegmentReader],
157 new_segment_id: SegmentId,
158 dense_strategy: DenseVectorStrategy<'_>,
159 ) -> Result<(SegmentMeta, MergeStats)> {
160 let mut stats = MergeStats::default();
161 let files = SegmentFiles::new(new_segment_id.0);
162
163 let mut term_dict_data = Vec::new();
165 let mut postings_data = Vec::new();
166 let terms_processed = self
167 .merge_postings_with_stats(
168 segments,
169 &mut term_dict_data,
170 &mut postings_data,
171 &mut stats,
172 )
173 .await?;
174 stats.terms_processed = terms_processed;
175 stats.term_dict_bytes = term_dict_data.len();
176 stats.postings_bytes = postings_data.len();
177
178 let current_mem = term_dict_data.capacity() + postings_data.capacity();
179 stats.current_memory_bytes = current_mem;
180 stats.peak_memory_bytes = stats.peak_memory_bytes.max(current_mem);
181
182 let mut store_data = Vec::new();
184 {
185 let mut store_merger = StoreMerger::new(&mut store_data);
186 for segment in segments {
187 let raw_blocks = segment.store_raw_blocks();
188 let data_slice = segment.store_data_slice();
189 store_merger.append_store(data_slice, &raw_blocks).await?;
190 }
191 store_merger.finish()?;
192 }
193 stats.store_bytes = store_data.len();
194
195 let current_mem =
196 term_dict_data.capacity() + postings_data.capacity() + store_data.capacity();
197 stats.peak_memory_bytes = stats.peak_memory_bytes.max(current_mem);
198
199 dir.write(&files.term_dict, &term_dict_data).await?;
200 dir.write(&files.postings, &postings_data).await?;
201 dir.write(&files.store, &store_data).await?;
202
203 drop(term_dict_data);
204 drop(postings_data);
205 drop(store_data);
206
207 let vectors_bytes = match &dense_strategy {
209 DenseVectorStrategy::MergeExisting => {
210 self.merge_dense_vectors_with_stats(dir, segments, &files)
211 .await?
212 }
213 DenseVectorStrategy::BuildAnn(trained) => {
214 self.build_ann_vectors(dir, segments, &files, trained)
215 .await?
216 }
217 };
218 stats.vectors_bytes = vectors_bytes;
219
220 let sparse_bytes = self
222 .merge_sparse_vectors_optimized(dir, segments, &files)
223 .await?;
224 stats.sparse_bytes = sparse_bytes;
225
226 let mut merged_field_stats: FxHashMap<u32, FieldStats> = FxHashMap::default();
228 for segment in segments {
229 for (&field_id, field_stats) in &segment.meta().field_stats {
230 let entry = merged_field_stats.entry(field_id).or_default();
231 entry.total_tokens += field_stats.total_tokens;
232 entry.doc_count += field_stats.doc_count;
233 }
234 }
235
236 let total_docs: u32 = segments.iter().map(|s| s.num_docs()).sum();
237 let meta = SegmentMeta {
238 id: new_segment_id.0,
239 num_docs: total_docs,
240 field_stats: merged_field_stats,
241 };
242
243 dir.write(&files.meta, &meta.serialize()?).await?;
244
245 let label = match &dense_strategy {
246 DenseVectorStrategy::MergeExisting => "Merge",
247 DenseVectorStrategy::BuildAnn(_) => "ANN merge",
248 };
249 log::info!(
250 "{} complete: {} docs, {} terms, term_dict={}, postings={}, store={}, vectors={}, sparse={}",
251 label,
252 total_docs,
253 stats.terms_processed,
254 MergeStats::format_memory(stats.term_dict_bytes),
255 MergeStats::format_memory(stats.postings_bytes),
256 MergeStats::format_memory(stats.store_bytes),
257 MergeStats::format_memory(stats.vectors_bytes),
258 MergeStats::format_memory(stats.sparse_bytes),
259 );
260
261 Ok((meta, stats))
262 }
263
264 async fn merge_rebuild_with_stats<D: Directory + DirectoryWriter>(
266 &self,
267 dir: &D,
268 segments: &[SegmentReader],
269 new_segment_id: SegmentId,
270 ) -> Result<(SegmentMeta, MergeStats)> {
271 let mut stats = MergeStats::default();
272
273 let mut builder =
274 SegmentBuilder::new((*self.schema).clone(), SegmentBuilderConfig::default())?;
275
276 for segment in segments {
277 for doc_id in 0..segment.num_docs() {
278 if let Some(doc) = segment.doc(doc_id).await? {
279 builder.add_document(doc)?;
280 }
281
282 if doc_id % 10000 == 0 {
284 let builder_stats = builder.stats();
285 stats.current_memory_bytes = builder_stats.estimated_memory_bytes;
286 stats.peak_memory_bytes =
287 stats.peak_memory_bytes.max(stats.current_memory_bytes);
288 }
289 }
290 }
291
292 let meta = builder.build(dir, new_segment_id).await?;
293 Ok((meta, stats))
294 }
295
296 async fn merge_postings_with_stats(
308 &self,
309 segments: &[SegmentReader],
310 term_dict: &mut Vec<u8>,
311 postings_out: &mut Vec<u8>,
312 stats: &mut MergeStats,
313 ) -> Result<usize> {
314 let mut doc_offsets = Vec::with_capacity(segments.len());
316 let mut offset = 0u32;
317 for segment in segments {
318 doc_offsets.push(offset);
319 offset += segment.num_docs();
320 }
321
322 let mut iterators: Vec<_> = segments.iter().map(|s| s.term_dict_iter()).collect();
324
325 let mut heap: BinaryHeap<MergeEntry> = BinaryHeap::new();
327 for (seg_idx, iter) in iterators.iter_mut().enumerate() {
328 if let Some((key, term_info)) = iter.next().await.map_err(crate::Error::from)? {
329 heap.push(MergeEntry {
330 key,
331 term_info,
332 segment_idx: seg_idx,
333 doc_offset: doc_offsets[seg_idx],
334 });
335 }
336 }
337
338 let mut term_results: Vec<(Vec<u8>, TermInfo)> = Vec::new();
341 let mut terms_processed = 0usize;
342
343 while !heap.is_empty() {
344 let first = heap.pop().unwrap();
346 let current_key = first.key.clone();
347
348 let mut sources: Vec<(usize, TermInfo, u32)> =
350 vec![(first.segment_idx, first.term_info, first.doc_offset)];
351
352 if let Some((key, term_info)) = iterators[first.segment_idx]
354 .next()
355 .await
356 .map_err(crate::Error::from)?
357 {
358 heap.push(MergeEntry {
359 key,
360 term_info,
361 segment_idx: first.segment_idx,
362 doc_offset: doc_offsets[first.segment_idx],
363 });
364 }
365
366 while let Some(entry) = heap.peek() {
368 if entry.key != current_key {
369 break;
370 }
371 let entry = heap.pop().unwrap();
372 sources.push((entry.segment_idx, entry.term_info, entry.doc_offset));
373
374 if let Some((key, term_info)) = iterators[entry.segment_idx]
376 .next()
377 .await
378 .map_err(crate::Error::from)?
379 {
380 heap.push(MergeEntry {
381 key,
382 term_info,
383 segment_idx: entry.segment_idx,
384 doc_offset: doc_offsets[entry.segment_idx],
385 });
386 }
387 }
388
389 let term_info = if sources.len() == 1 {
391 let (seg_idx, source_info, seg_doc_offset) = &sources[0];
393 self.copy_term_posting(
394 &segments[*seg_idx],
395 source_info,
396 *seg_doc_offset,
397 postings_out,
398 )
399 .await?
400 } else {
401 self.merge_term_postings(segments, &sources, postings_out)
403 .await?
404 };
405
406 term_results.push((current_key, term_info));
407 terms_processed += 1;
408
409 if terms_processed.is_multiple_of(100_000) {
411 log::debug!("Merge progress: {} terms processed", terms_processed);
412 }
413 }
414
415 let results_mem = term_results.capacity() * std::mem::size_of::<(Vec<u8>, TermInfo)>();
417 stats.current_memory_bytes = results_mem + postings_out.capacity();
418 stats.peak_memory_bytes = stats.peak_memory_bytes.max(stats.current_memory_bytes);
419
420 log::info!(
421 "[merge] complete: terms={}, segments={}, term_buffer={:.2} MB, postings={:.2} MB, peak={:.2} MB",
422 terms_processed,
423 segments.len(),
424 results_mem as f64 / (1024.0 * 1024.0),
425 postings_out.capacity() as f64 / (1024.0 * 1024.0),
426 stats.peak_memory_bytes as f64 / (1024.0 * 1024.0)
427 );
428
429 let mut writer = SSTableWriter::<TermInfo>::new(term_dict);
431 for (key, term_info) in term_results {
432 writer.insert(&key, &term_info)?;
433 }
434 writer.finish()?;
435
436 Ok(terms_processed)
437 }
438
439 async fn copy_term_posting(
442 &self,
443 segment: &SegmentReader,
444 source_info: &TermInfo,
445 doc_offset: u32,
446 postings_out: &mut Vec<u8>,
447 ) -> Result<TermInfo> {
448 if let Some((doc_ids, term_freqs)) = source_info.decode_inline() {
450 let remapped_ids: Vec<u32> = doc_ids.iter().map(|&id| id + doc_offset).collect();
451 if let Some(inline) = TermInfo::try_inline(&remapped_ids, &term_freqs) {
452 return Ok(inline);
453 }
454 let mut pl = PostingList::with_capacity(remapped_ids.len());
456 for (doc_id, tf) in remapped_ids.into_iter().zip(term_freqs.into_iter()) {
457 pl.push(doc_id, tf);
458 }
459 let posting_offset = postings_out.len() as u64;
460 let block_list = BlockPostingList::from_posting_list(&pl)?;
461 let mut encoded = Vec::new();
462 block_list.serialize(&mut encoded)?;
463 postings_out.extend_from_slice(&encoded);
464 return Ok(TermInfo::external(
465 posting_offset,
466 encoded.len() as u32,
467 pl.doc_count(),
468 ));
469 }
470
471 let (offset, len) = source_info.external_info().unwrap();
474 let posting_bytes = segment.read_postings(offset, len).await?;
475 let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
476
477 let mut remapped = PostingList::with_capacity(source_postings.doc_count() as usize);
479 let mut iter = source_postings.iterator();
480 while iter.doc() != TERMINATED {
481 remapped.add(iter.doc() + doc_offset, iter.term_freq());
482 iter.advance();
483 }
484
485 let doc_ids: Vec<u32> = remapped.iter().map(|p| p.doc_id).collect();
487 let term_freqs: Vec<u32> = remapped.iter().map(|p| p.term_freq).collect();
488
489 if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
490 return Ok(inline);
491 }
492
493 let posting_offset = postings_out.len() as u64;
495 let block_list = BlockPostingList::from_posting_list(&remapped)?;
496 let mut encoded = Vec::new();
497 block_list.serialize(&mut encoded)?;
498 postings_out.extend_from_slice(&encoded);
499
500 Ok(TermInfo::external(
501 posting_offset,
502 encoded.len() as u32,
503 remapped.doc_count(),
504 ))
505 }
506
507 async fn merge_term_postings(
510 &self,
511 segments: &[SegmentReader],
512 sources: &[(usize, TermInfo, u32)],
513 postings_out: &mut Vec<u8>,
514 ) -> Result<TermInfo> {
515 let mut sorted_sources: Vec<_> = sources.to_vec();
517 sorted_sources.sort_by_key(|(_, _, doc_offset)| *doc_offset);
518
519 let all_external = sorted_sources
521 .iter()
522 .all(|(_, term_info, _)| term_info.external_info().is_some());
523
524 if all_external && sorted_sources.len() > 1 {
525 let mut block_sources = Vec::with_capacity(sorted_sources.len());
527
528 for (seg_idx, term_info, doc_offset) in &sorted_sources {
529 let segment = &segments[*seg_idx];
530 let (offset, len) = term_info.external_info().unwrap();
531 let posting_bytes = segment.read_postings(offset, len).await?;
532 let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
533 block_sources.push((source_postings, *doc_offset));
534 }
535
536 let merged_blocks = BlockPostingList::concatenate_blocks(&block_sources)?;
537 let posting_offset = postings_out.len() as u64;
538 let mut encoded = Vec::new();
539 merged_blocks.serialize(&mut encoded)?;
540 postings_out.extend_from_slice(&encoded);
541
542 return Ok(TermInfo::external(
543 posting_offset,
544 encoded.len() as u32,
545 merged_blocks.doc_count(),
546 ));
547 }
548
549 let mut merged = PostingList::new();
551
552 for (seg_idx, term_info, doc_offset) in &sorted_sources {
553 let segment = &segments[*seg_idx];
554
555 if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
556 for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
558 merged.add(doc_id + doc_offset, tf);
559 }
560 } else {
561 let (offset, len) = term_info.external_info().unwrap();
563 let posting_bytes = segment.read_postings(offset, len).await?;
564 let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
565
566 let mut iter = source_postings.iterator();
567 while iter.doc() != TERMINATED {
568 merged.add(iter.doc() + doc_offset, iter.term_freq());
569 iter.advance();
570 }
571 }
572 }
573
574 let doc_ids: Vec<u32> = merged.iter().map(|p| p.doc_id).collect();
576 let term_freqs: Vec<u32> = merged.iter().map(|p| p.term_freq).collect();
577
578 if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
579 return Ok(inline);
580 }
581
582 let posting_offset = postings_out.len() as u64;
584 let block_list = BlockPostingList::from_posting_list(&merged)?;
585 let mut encoded = Vec::new();
586 block_list.serialize(&mut encoded)?;
587 postings_out.extend_from_slice(&encoded);
588
589 Ok(TermInfo::external(
590 posting_offset,
591 encoded.len() as u32,
592 merged.doc_count(),
593 ))
594 }
595 async fn merge_dense_vectors_with_stats<D: Directory + DirectoryWriter>(
601 &self,
602 dir: &D,
603 segments: &[SegmentReader],
604 files: &SegmentFiles,
605 ) -> Result<usize> {
606 use byteorder::{LittleEndian, WriteBytesExt};
607
608 let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
610
611 for (field, entry) in self.schema.fields() {
612 if !matches!(entry.field_type, FieldType::DenseVector) {
613 continue;
614 }
615
616 let scann_indexes: Vec<_> = segments
618 .iter()
619 .filter_map(|s| s.get_scann_vector_index(field))
620 .collect();
621
622 if scann_indexes.len()
623 == segments
624 .iter()
625 .filter(|s| s.has_dense_vector_index(field))
626 .count()
627 && !scann_indexes.is_empty()
628 {
629 let refs: Vec<&crate::structures::IVFPQIndex> =
631 scann_indexes.iter().map(|(idx, _)| idx.as_ref()).collect();
632
633 let mut doc_offsets = Vec::with_capacity(segments.len());
635 let mut offset = 0u32;
636 for segment in segments {
637 doc_offsets.push(offset);
638 offset += segment.num_docs();
639 }
640
641 match crate::structures::IVFPQIndex::merge(&refs, &doc_offsets) {
642 Ok(merged) => {
643 let bytes = merged
644 .to_bytes()
645 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
646 field_indexes.push((field.0, 2u8, bytes)); continue;
648 }
649 Err(e) => {
650 log::warn!("ScaNN merge failed: {}, falling back to IVF", e);
651 }
652 }
653 }
654
655 let ivf_indexes: Vec<_> = segments
657 .iter()
658 .filter_map(|s| s.get_ivf_vector_index(field))
659 .collect();
660
661 if ivf_indexes.len()
662 == segments
663 .iter()
664 .filter(|s| s.has_dense_vector_index(field))
665 .count()
666 && !ivf_indexes.is_empty()
667 {
668 let refs: Vec<&crate::structures::IVFRaBitQIndex> =
670 ivf_indexes.iter().map(|arc| arc.as_ref()).collect();
671
672 let mut doc_offsets = Vec::with_capacity(segments.len());
674 let mut offset = 0u32;
675 for segment in segments {
676 doc_offsets.push(offset);
677 offset += segment.num_docs();
678 }
679
680 match crate::structures::IVFRaBitQIndex::merge(&refs, &doc_offsets) {
681 Ok(merged) => {
682 let bytes = merged
683 .to_bytes()
684 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
685 field_indexes.push((field.0, 1u8, bytes)); continue;
687 }
688 Err(e) => {
689 log::warn!("IVF merge failed: {}, falling back to rebuild", e);
690 }
691 }
692 }
693
694 let mut all_vectors: Vec<Vec<f32>> = Vec::new();
696
697 for segment in segments {
698 if let Some(index) = segment.get_dense_vector_index(field)
699 && let Some(raw_vecs) = &index.raw_vectors
700 {
701 all_vectors.extend(raw_vecs.iter().cloned());
702 }
703 }
704
705 if !all_vectors.is_empty() {
706 let dim = all_vectors[0].len();
707 let config = RaBitQConfig::new(dim);
708 let merged_index = RaBitQIndex::build(config, &all_vectors, true);
709
710 let index_bytes = serde_json::to_vec(&merged_index)
711 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
712
713 field_indexes.push((field.0, 0u8, index_bytes)); }
715 }
716
717 if !field_indexes.is_empty() {
719 field_indexes.sort_by_key(|(id, _, _)| *id);
720
721 let header_size = 4 + field_indexes.len() * (4 + 1 + 8 + 8);
723 let mut output = Vec::new();
724
725 output.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
726
727 let mut current_offset = header_size as u64;
728 for (field_id, index_type, data) in &field_indexes {
729 output.write_u32::<LittleEndian>(*field_id)?;
730 output.write_u8(*index_type)?;
731 output.write_u64::<LittleEndian>(current_offset)?;
732 output.write_u64::<LittleEndian>(data.len() as u64)?;
733 current_offset += data.len() as u64;
734 }
735
736 for (_, _, data) in field_indexes {
737 output.extend_from_slice(&data);
738 }
739
740 let output_size = output.len();
741 dir.write(&files.vectors, &output).await?;
742 return Ok(output_size);
743 }
744
745 Ok(0)
746 }
747
748 async fn merge_sparse_vectors_optimized<D: Directory + DirectoryWriter>(
754 &self,
755 dir: &D,
756 segments: &[SegmentReader],
757 files: &SegmentFiles,
758 ) -> Result<usize> {
759 use crate::structures::BlockSparsePostingList;
760 use byteorder::{LittleEndian, WriteBytesExt};
761
762 let mut doc_offsets = Vec::with_capacity(segments.len());
764 let mut offset = 0u32;
765 for (i, segment) in segments.iter().enumerate() {
766 log::debug!(
767 "Sparse merge: segment {} has {} docs, doc_offset={}",
768 i,
769 segment.num_docs(),
770 offset
771 );
772 doc_offsets.push(offset);
773 offset += segment.num_docs();
774 }
775
776 let sparse_fields: Vec<_> = self
778 .schema
779 .fields()
780 .filter(|(_, entry)| matches!(entry.field_type, FieldType::SparseVector))
781 .map(|(field, entry)| (field, entry.sparse_vector_config.clone()))
782 .collect();
783
784 if sparse_fields.is_empty() {
785 return Ok(0);
786 }
787
788 type SparseFieldData = (
790 u32,
791 crate::structures::WeightQuantization,
792 u32,
793 FxHashMap<u32, Vec<u8>>,
794 );
795 let mut field_data: Vec<SparseFieldData> = Vec::new();
796
797 for (field, sparse_config) in &sparse_fields {
798 let quantization = sparse_config
800 .as_ref()
801 .map(|c| c.weight_quantization)
802 .unwrap_or(crate::structures::WeightQuantization::Float32);
803
804 let mut all_dims: rustc_hash::FxHashSet<u32> = rustc_hash::FxHashSet::default();
806 for segment in segments {
807 if let Some(sparse_index) = segment.sparse_indexes().get(&field.0) {
808 for dim_id in sparse_index.active_dimensions() {
809 all_dims.insert(dim_id);
810 }
811 }
812 }
813
814 if all_dims.is_empty() {
815 continue;
816 }
817
818 let _max_dim_id = all_dims.iter().max().copied().unwrap_or(0);
819 let mut dim_bytes: FxHashMap<u32, Vec<u8>> = FxHashMap::default();
820
821 for dim_id in all_dims {
823 let mut posting_arcs: Vec<(Arc<BlockSparsePostingList>, u32)> = Vec::new();
826
827 for (seg_idx, segment) in segments.iter().enumerate() {
828 if let Some(sparse_index) = segment.sparse_indexes().get(&field.0)
829 && let Ok(Some(posting_list)) = sparse_index.get_posting(dim_id).await
830 {
831 log::trace!(
832 "Sparse merge dim={}: seg={} offset={} doc_count={} blocks={}",
833 dim_id,
834 seg_idx,
835 doc_offsets[seg_idx],
836 posting_list.doc_count(),
837 posting_list.blocks.len()
838 );
839 posting_arcs.push((posting_list, doc_offsets[seg_idx]));
840 }
841 }
842
843 let lists_with_offsets: Vec<(&BlockSparsePostingList, u32)> = posting_arcs
845 .iter()
846 .map(|(pl, offset)| (pl.as_ref(), *offset))
847 .collect();
848
849 if lists_with_offsets.is_empty() {
850 continue;
851 }
852
853 let merged = BlockSparsePostingList::merge_with_offsets(&lists_with_offsets);
855
856 log::trace!(
857 "Sparse merge dim={}: merged {} lists -> doc_count={} blocks={}",
858 dim_id,
859 lists_with_offsets.len(),
860 merged.doc_count(),
861 merged.blocks.len()
862 );
863
864 let mut bytes = Vec::new();
866 merged.serialize(&mut bytes).map_err(crate::Error::Io)?;
867 dim_bytes.insert(dim_id, bytes);
868 }
869
870 field_data.push((field.0, quantization, dim_bytes.len() as u32, dim_bytes));
872 }
873
874 if field_data.is_empty() {
875 return Ok(0);
876 }
877
878 field_data.sort_by_key(|(id, _, _, _)| *id);
880
881 let mut header_size = 4u64;
885 for (_, _, num_dims, _) in &field_data {
886 header_size += 4 + 1 + 4; header_size += (*num_dims as u64) * 16; }
889
890 let mut output = Vec::new();
891 output.write_u32::<LittleEndian>(field_data.len() as u32)?;
892
893 let mut current_offset = header_size;
894 let mut all_data: Vec<u8> = Vec::new();
895 let mut field_tables: Vec<Vec<(u32, u64, u32)>> = Vec::new();
897
898 for (_, _, _, dim_bytes) in &field_data {
899 let mut table: Vec<(u32, u64, u32)> = Vec::with_capacity(dim_bytes.len());
900
901 let mut dims: Vec<_> = dim_bytes.keys().copied().collect();
903 dims.sort();
904
905 for dim_id in dims {
906 let bytes = &dim_bytes[&dim_id];
907 table.push((dim_id, current_offset, bytes.len() as u32));
908 current_offset += bytes.len() as u64;
909 all_data.extend_from_slice(bytes);
910 }
911 field_tables.push(table);
912 }
913
914 for (i, (field_id, quantization, num_dims, _)) in field_data.iter().enumerate() {
916 output.write_u32::<LittleEndian>(*field_id)?;
917 output.write_u8(*quantization as u8)?;
918 output.write_u32::<LittleEndian>(*num_dims)?;
919
920 for &(dim_id, offset, length) in &field_tables[i] {
922 output.write_u32::<LittleEndian>(dim_id)?;
923 output.write_u64::<LittleEndian>(offset)?;
924 output.write_u32::<LittleEndian>(length)?;
925 }
926 }
927
928 output.extend_from_slice(&all_data);
930
931 let output_size = output.len();
932 dir.write(&files.sparse, &output).await?;
933
934 log::info!(
935 "Sparse vector merge complete: {} fields, {} bytes",
936 field_data.len(),
937 output_size
938 );
939
940 Ok(output_size)
941 }
942}
943
944pub struct TrainedVectorStructures {
946 pub centroids: rustc_hash::FxHashMap<u32, Arc<crate::structures::CoarseCentroids>>,
948 pub codebooks: rustc_hash::FxHashMap<u32, Arc<crate::structures::PQCodebook>>,
950}
951
952pub enum DenseVectorStrategy<'a> {
954 MergeExisting,
956 BuildAnn(&'a TrainedVectorStructures),
958}
959
960impl SegmentMerger {
961 pub async fn merge_with_ann<D: Directory + DirectoryWriter>(
966 &self,
967 dir: &D,
968 segments: &[SegmentReader],
969 new_segment_id: SegmentId,
970 trained: &TrainedVectorStructures,
971 ) -> Result<SegmentMeta> {
972 let (meta, _stats) = self
973 .merge_core(
974 dir,
975 segments,
976 new_segment_id,
977 DenseVectorStrategy::BuildAnn(trained),
978 )
979 .await?;
980 Ok(meta)
981 }
982
983 async fn build_ann_vectors<D: Directory + DirectoryWriter>(
985 &self,
986 dir: &D,
987 segments: &[SegmentReader],
988 files: &SegmentFiles,
989 trained: &TrainedVectorStructures,
990 ) -> Result<usize> {
991 use crate::dsl::VectorIndexType;
992 use byteorder::{LittleEndian, WriteBytesExt};
993
994 let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
995
996 for (field, entry) in self.schema.fields() {
997 if !matches!(entry.field_type, FieldType::DenseVector) || !entry.indexed {
998 continue;
999 }
1000
1001 let config = match &entry.dense_vector_config {
1002 Some(c) => c,
1003 None => continue,
1004 };
1005
1006 let mut all_vectors: Vec<Vec<f32>> = Vec::new();
1008 let mut all_doc_ids: Vec<(u32, u16)> = Vec::new();
1009 let mut doc_offset = 0u32;
1010
1011 for segment in segments {
1012 if let Some(super::VectorIndex::Flat(flat_data)) =
1013 segment.vector_indexes().get(&field.0)
1014 {
1015 for (vec, (local_doc_id, ordinal)) in
1016 flat_data.vectors.iter().zip(flat_data.doc_ids.iter())
1017 {
1018 all_vectors.push(vec.clone());
1019 all_doc_ids.push((doc_offset + local_doc_id, *ordinal));
1020 }
1021 }
1022 doc_offset += segment.num_docs();
1023 }
1024
1025 if all_vectors.is_empty() {
1026 continue;
1027 }
1028
1029 let dim = config.index_dim();
1030
1031 let ann_doc_ids: Vec<u32> = all_doc_ids.iter().map(|(doc_id, _)| *doc_id).collect();
1033
1034 match config.index_type {
1036 VectorIndexType::IvfRaBitQ => {
1037 if let Some(centroids) = trained.centroids.get(&field.0) {
1038 let rabitq_config = crate::structures::RaBitQConfig::new(dim);
1040 let codebook = crate::structures::RaBitQCodebook::new(rabitq_config);
1041
1042 let ivf_config = crate::structures::IVFRaBitQConfig::new(dim)
1044 .with_store_raw(config.store_raw);
1045 let ivf_index = crate::structures::IVFRaBitQIndex::build(
1046 ivf_config,
1047 centroids,
1048 &codebook,
1049 &all_vectors,
1050 Some(&ann_doc_ids),
1051 );
1052
1053 let index_data = super::builder::IVFRaBitQIndexData {
1054 centroids: (**centroids).clone(),
1055 codebook,
1056 index: ivf_index,
1057 };
1058 let bytes = index_data
1059 .to_bytes()
1060 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
1061 field_indexes.push((field.0, 1u8, bytes)); log::info!(
1064 "Built IVF-RaBitQ index for field {} with {} vectors",
1065 field.0,
1066 all_vectors.len()
1067 );
1068 continue;
1069 }
1070 }
1071 VectorIndexType::ScaNN => {
1072 if let (Some(centroids), Some(codebook)) = (
1073 trained.centroids.get(&field.0),
1074 trained.codebooks.get(&field.0),
1075 ) {
1076 let ivf_pq_config = crate::structures::IVFPQConfig::new(dim);
1078 let ivf_pq_index = crate::structures::IVFPQIndex::build(
1079 ivf_pq_config,
1080 centroids,
1081 codebook,
1082 &all_vectors,
1083 Some(&ann_doc_ids),
1084 );
1085
1086 let index_data = super::builder::ScaNNIndexData {
1087 centroids: (**centroids).clone(),
1088 codebook: (**codebook).clone(),
1089 index: ivf_pq_index,
1090 };
1091 let bytes = index_data
1092 .to_bytes()
1093 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
1094 field_indexes.push((field.0, 2u8, bytes)); log::info!(
1097 "Built ScaNN index for field {} with {} vectors",
1098 field.0,
1099 all_vectors.len()
1100 );
1101 continue;
1102 }
1103 }
1104 _ => {}
1105 }
1106
1107 let flat_data = super::builder::FlatVectorData {
1109 dim,
1110 vectors: all_vectors,
1111 doc_ids: all_doc_ids,
1112 };
1113 let bytes = flat_data.to_binary_bytes();
1114 field_indexes.push((field.0, 4u8, bytes)); }
1116
1117 if !field_indexes.is_empty() {
1119 field_indexes.sort_by_key(|(id, _, _)| *id);
1120
1121 let header_size = 4 + field_indexes.len() * (4 + 1 + 8 + 8);
1122 let mut output = Vec::new();
1123
1124 output.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
1125
1126 let mut current_offset = header_size as u64;
1127 for (field_id, index_type, data) in &field_indexes {
1128 output.write_u32::<LittleEndian>(*field_id)?;
1129 output.write_u8(*index_type)?;
1130 output.write_u64::<LittleEndian>(current_offset)?;
1131 output.write_u64::<LittleEndian>(data.len() as u64)?;
1132 current_offset += data.len() as u64;
1133 }
1134
1135 for (_, _, data) in field_indexes {
1136 output.extend_from_slice(&data);
1137 }
1138
1139 let output_size = output.len();
1140 dir.write(&files.vectors, &output).await?;
1141 return Ok(output_size);
1142 }
1143
1144 Ok(0)
1145 }
1146}
1147
1148pub async fn delete_segment<D: Directory + DirectoryWriter>(
1150 dir: &D,
1151 segment_id: SegmentId,
1152) -> Result<()> {
1153 let files = SegmentFiles::new(segment_id.0);
1154 let _ = dir.delete(&files.term_dict).await;
1155 let _ = dir.delete(&files.postings).await;
1156 let _ = dir.delete(&files.store).await;
1157 let _ = dir.delete(&files.meta).await;
1158 let _ = dir.delete(&files.vectors).await;
1159 Ok(())
1160}