1use std::cmp::Ordering;
4use std::collections::BinaryHeap;
5use std::sync::Arc;
6
7use rustc_hash::FxHashMap;
8
9use super::builder::{SegmentBuilder, SegmentBuilderConfig};
10use super::reader::SegmentReader;
11use super::store::StoreMerger;
12use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
13use crate::Result;
14use crate::directories::{Directory, DirectoryWriter};
15use crate::dsl::{FieldType, Schema};
16use crate::structures::{
17 BlockPostingList, PostingList, RaBitQConfig, RaBitQIndex, SSTableWriter, TERMINATED, TermInfo,
18};
19
20#[derive(Debug, Clone, Default)]
22pub struct MergeStats {
23 pub terms_processed: usize,
25 pub postings_merged: usize,
27 pub peak_memory_bytes: usize,
29 pub current_memory_bytes: usize,
31 pub term_dict_bytes: usize,
33 pub postings_bytes: usize,
35 pub store_bytes: usize,
37 pub vectors_bytes: usize,
39 pub sparse_bytes: usize,
41}
42
43impl MergeStats {
44 pub fn format_memory(bytes: usize) -> String {
46 if bytes >= 1024 * 1024 * 1024 {
47 format!("{:.2} GB", bytes as f64 / (1024.0 * 1024.0 * 1024.0))
48 } else if bytes >= 1024 * 1024 {
49 format!("{:.2} MB", bytes as f64 / (1024.0 * 1024.0))
50 } else if bytes >= 1024 {
51 format!("{:.2} KB", bytes as f64 / 1024.0)
52 } else {
53 format!("{} B", bytes)
54 }
55 }
56}
57
58struct MergeEntry {
60 key: Vec<u8>,
61 term_info: TermInfo,
62 segment_idx: usize,
63 doc_offset: u32,
64}
65
66impl PartialEq for MergeEntry {
67 fn eq(&self, other: &Self) -> bool {
68 self.key == other.key
69 }
70}
71
72impl Eq for MergeEntry {}
73
74impl PartialOrd for MergeEntry {
75 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
76 Some(self.cmp(other))
77 }
78}
79
80impl Ord for MergeEntry {
81 fn cmp(&self, other: &Self) -> Ordering {
82 other.key.cmp(&self.key)
84 }
85}
86
87pub struct SegmentMerger {
89 schema: Arc<Schema>,
90}
91
92impl SegmentMerger {
93 pub fn new(schema: Arc<Schema>) -> Self {
94 Self { schema }
95 }
96
97 pub async fn merge<D: Directory + DirectoryWriter>(
99 &self,
100 dir: &D,
101 segments: &[SegmentReader],
102 new_segment_id: SegmentId,
103 ) -> Result<SegmentMeta> {
104 let (meta, _stats) = self.merge_with_stats(dir, segments, new_segment_id).await?;
105 Ok(meta)
106 }
107
108 pub async fn merge_with_stats<D: Directory + DirectoryWriter>(
110 &self,
111 dir: &D,
112 segments: &[SegmentReader],
113 new_segment_id: SegmentId,
114 ) -> Result<(SegmentMeta, MergeStats)> {
115 let can_stack_stores = segments.iter().all(|s| !s.store_has_dict());
117
118 let has_positions = self
121 .schema
122 .fields()
123 .any(|(_, entry)| entry.positions.is_some());
124
125 if can_stack_stores && !has_positions {
127 self.merge_optimized_with_stats(dir, segments, new_segment_id)
128 .await
129 } else {
130 self.merge_rebuild_with_stats(dir, segments, new_segment_id)
131 .await
132 }
133 }
134
135 async fn merge_optimized_with_stats<D: Directory + DirectoryWriter>(
137 &self,
138 dir: &D,
139 segments: &[SegmentReader],
140 new_segment_id: SegmentId,
141 ) -> Result<(SegmentMeta, MergeStats)> {
142 let mut stats = MergeStats::default();
143 let files = SegmentFiles::new(new_segment_id.0);
144
145 let mut term_dict_data = Vec::new();
147 let mut postings_data = Vec::new();
148 let terms_processed = self
149 .merge_postings_with_stats(
150 segments,
151 &mut term_dict_data,
152 &mut postings_data,
153 &mut stats,
154 )
155 .await?;
156 stats.terms_processed = terms_processed;
157 stats.term_dict_bytes = term_dict_data.len();
158 stats.postings_bytes = postings_data.len();
159
160 let current_mem = term_dict_data.capacity() + postings_data.capacity();
162 stats.current_memory_bytes = current_mem;
163 stats.peak_memory_bytes = stats.peak_memory_bytes.max(current_mem);
164
165 let mut store_data = Vec::new();
167 {
168 let mut store_merger = StoreMerger::new(&mut store_data);
169 for segment in segments {
170 let raw_blocks = segment.store_raw_blocks();
171 let data_slice = segment.store_data_slice();
172 store_merger.append_store(data_slice, &raw_blocks).await?;
173 }
174 store_merger.finish()?;
175 }
176 stats.store_bytes = store_data.len();
177
178 let current_mem =
180 term_dict_data.capacity() + postings_data.capacity() + store_data.capacity();
181 stats.peak_memory_bytes = stats.peak_memory_bytes.max(current_mem);
182
183 dir.write(&files.term_dict, &term_dict_data).await?;
185 dir.write(&files.postings, &postings_data).await?;
186 dir.write(&files.store, &store_data).await?;
187
188 drop(term_dict_data);
190 drop(postings_data);
191 drop(store_data);
192
193 let vectors_bytes = self
195 .merge_dense_vectors_with_stats(dir, segments, &files)
196 .await?;
197 stats.vectors_bytes = vectors_bytes;
198
199 let sparse_bytes = self
201 .merge_sparse_vectors_optimized(dir, segments, &files)
202 .await?;
203 stats.sparse_bytes = sparse_bytes;
204
205 let mut merged_field_stats: FxHashMap<u32, FieldStats> = FxHashMap::default();
207 for segment in segments {
208 for (&field_id, field_stats) in &segment.meta().field_stats {
209 let entry = merged_field_stats.entry(field_id).or_default();
210 entry.total_tokens += field_stats.total_tokens;
211 entry.doc_count += field_stats.doc_count;
212 }
213 }
214
215 let total_docs: u32 = segments.iter().map(|s| s.num_docs()).sum();
216 let meta = SegmentMeta {
217 id: new_segment_id.0,
218 num_docs: total_docs,
219 field_stats: merged_field_stats,
220 };
221
222 dir.write(&files.meta, &meta.serialize()?).await?;
223
224 log::info!(
225 "Merge complete: {} terms, output: term_dict={}, postings={}, store={}, vectors={}, sparse={}",
226 stats.terms_processed,
227 MergeStats::format_memory(stats.term_dict_bytes),
228 MergeStats::format_memory(stats.postings_bytes),
229 MergeStats::format_memory(stats.store_bytes),
230 MergeStats::format_memory(stats.vectors_bytes),
231 MergeStats::format_memory(stats.sparse_bytes),
232 );
233
234 Ok((meta, stats))
235 }
236
237 async fn merge_rebuild_with_stats<D: Directory + DirectoryWriter>(
239 &self,
240 dir: &D,
241 segments: &[SegmentReader],
242 new_segment_id: SegmentId,
243 ) -> Result<(SegmentMeta, MergeStats)> {
244 let mut stats = MergeStats::default();
245
246 let mut builder =
247 SegmentBuilder::new((*self.schema).clone(), SegmentBuilderConfig::default())?;
248
249 for segment in segments {
250 for doc_id in 0..segment.num_docs() {
251 if let Some(doc) = segment.doc(doc_id).await? {
252 builder.add_document(doc)?;
253 }
254
255 if doc_id % 10000 == 0 {
257 let builder_stats = builder.stats();
258 stats.current_memory_bytes = builder_stats.estimated_memory_bytes;
259 stats.peak_memory_bytes =
260 stats.peak_memory_bytes.max(stats.current_memory_bytes);
261 }
262 }
263 }
264
265 let meta = builder.build(dir, new_segment_id).await?;
266 Ok((meta, stats))
267 }
268
269 async fn merge_postings_with_stats(
281 &self,
282 segments: &[SegmentReader],
283 term_dict: &mut Vec<u8>,
284 postings_out: &mut Vec<u8>,
285 stats: &mut MergeStats,
286 ) -> Result<usize> {
287 let mut doc_offsets = Vec::with_capacity(segments.len());
289 let mut offset = 0u32;
290 for segment in segments {
291 doc_offsets.push(offset);
292 offset += segment.num_docs();
293 }
294
295 let mut iterators: Vec<_> = segments.iter().map(|s| s.term_dict_iter()).collect();
297
298 let mut heap: BinaryHeap<MergeEntry> = BinaryHeap::new();
300 for (seg_idx, iter) in iterators.iter_mut().enumerate() {
301 if let Some((key, term_info)) = iter.next().await.map_err(crate::Error::from)? {
302 heap.push(MergeEntry {
303 key,
304 term_info,
305 segment_idx: seg_idx,
306 doc_offset: doc_offsets[seg_idx],
307 });
308 }
309 }
310
311 let mut term_results: Vec<(Vec<u8>, TermInfo)> = Vec::new();
313 let mut terms_processed = 0usize;
314
315 while !heap.is_empty() {
316 let first = heap.pop().unwrap();
318 let current_key = first.key.clone();
319
320 let mut sources: Vec<(usize, TermInfo, u32)> =
322 vec![(first.segment_idx, first.term_info, first.doc_offset)];
323
324 if let Some((key, term_info)) = iterators[first.segment_idx]
326 .next()
327 .await
328 .map_err(crate::Error::from)?
329 {
330 heap.push(MergeEntry {
331 key,
332 term_info,
333 segment_idx: first.segment_idx,
334 doc_offset: doc_offsets[first.segment_idx],
335 });
336 }
337
338 while let Some(entry) = heap.peek() {
340 if entry.key != current_key {
341 break;
342 }
343 let entry = heap.pop().unwrap();
344 sources.push((entry.segment_idx, entry.term_info, entry.doc_offset));
345
346 if let Some((key, term_info)) = iterators[entry.segment_idx]
348 .next()
349 .await
350 .map_err(crate::Error::from)?
351 {
352 heap.push(MergeEntry {
353 key,
354 term_info,
355 segment_idx: entry.segment_idx,
356 doc_offset: doc_offsets[entry.segment_idx],
357 });
358 }
359 }
360
361 let term_info = if sources.len() == 1 {
363 let (seg_idx, source_info, seg_doc_offset) = &sources[0];
365 self.copy_term_posting(
366 &segments[*seg_idx],
367 source_info,
368 *seg_doc_offset,
369 postings_out,
370 )
371 .await?
372 } else {
373 self.merge_term_postings(segments, &sources, postings_out)
375 .await?
376 };
377
378 term_results.push((current_key, term_info));
379 terms_processed += 1;
380
381 if terms_processed.is_multiple_of(100_000) {
383 log::debug!("Merge progress: {} terms processed", terms_processed);
384 }
385 }
386
387 log::info!(
388 "Merge complete: {} terms processed from {} segments",
389 terms_processed,
390 segments.len()
391 );
392
393 let results_mem = term_results.capacity() * std::mem::size_of::<(Vec<u8>, TermInfo)>();
395 stats.current_memory_bytes = results_mem + postings_out.capacity();
396 stats.peak_memory_bytes = stats.peak_memory_bytes.max(stats.current_memory_bytes);
397
398 let mut writer = SSTableWriter::<TermInfo>::new(term_dict);
400 for (key, term_info) in term_results {
401 writer.insert(&key, &term_info)?;
402 }
403 writer.finish()?;
404
405 Ok(terms_processed)
406 }
407
408 async fn copy_term_posting(
411 &self,
412 segment: &SegmentReader,
413 source_info: &TermInfo,
414 doc_offset: u32,
415 postings_out: &mut Vec<u8>,
416 ) -> Result<TermInfo> {
417 if let Some((doc_ids, term_freqs)) = source_info.decode_inline() {
419 let remapped_ids: Vec<u32> = doc_ids.iter().map(|&id| id + doc_offset).collect();
420 if let Some(inline) = TermInfo::try_inline(&remapped_ids, &term_freqs) {
421 return Ok(inline);
422 }
423 let mut pl = PostingList::with_capacity(remapped_ids.len());
425 for (doc_id, tf) in remapped_ids.into_iter().zip(term_freqs.into_iter()) {
426 pl.push(doc_id, tf);
427 }
428 let posting_offset = postings_out.len() as u64;
429 let block_list = BlockPostingList::from_posting_list(&pl)?;
430 let mut encoded = Vec::new();
431 block_list.serialize(&mut encoded)?;
432 postings_out.extend_from_slice(&encoded);
433 return Ok(TermInfo::external(
434 posting_offset,
435 encoded.len() as u32,
436 pl.doc_count(),
437 ));
438 }
439
440 let (offset, len) = source_info.external_info().unwrap();
443 let posting_bytes = segment.read_postings(offset, len).await?;
444 let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
445
446 let mut remapped = PostingList::with_capacity(source_postings.doc_count() as usize);
448 let mut iter = source_postings.iterator();
449 while iter.doc() != TERMINATED {
450 remapped.add(iter.doc() + doc_offset, iter.term_freq());
451 iter.advance();
452 }
453
454 let doc_ids: Vec<u32> = remapped.iter().map(|p| p.doc_id).collect();
456 let term_freqs: Vec<u32> = remapped.iter().map(|p| p.term_freq).collect();
457
458 if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
459 return Ok(inline);
460 }
461
462 let posting_offset = postings_out.len() as u64;
464 let block_list = BlockPostingList::from_posting_list(&remapped)?;
465 let mut encoded = Vec::new();
466 block_list.serialize(&mut encoded)?;
467 postings_out.extend_from_slice(&encoded);
468
469 Ok(TermInfo::external(
470 posting_offset,
471 encoded.len() as u32,
472 remapped.doc_count(),
473 ))
474 }
475
476 async fn merge_term_postings(
479 &self,
480 segments: &[SegmentReader],
481 sources: &[(usize, TermInfo, u32)],
482 postings_out: &mut Vec<u8>,
483 ) -> Result<TermInfo> {
484 let mut sorted_sources: Vec<_> = sources.to_vec();
486 sorted_sources.sort_by_key(|(_, _, doc_offset)| *doc_offset);
487
488 let all_external = sorted_sources
490 .iter()
491 .all(|(_, term_info, _)| term_info.external_info().is_some());
492
493 if all_external && sorted_sources.len() > 1 {
494 let mut block_sources = Vec::with_capacity(sorted_sources.len());
496
497 for (seg_idx, term_info, doc_offset) in &sorted_sources {
498 let segment = &segments[*seg_idx];
499 let (offset, len) = term_info.external_info().unwrap();
500 let posting_bytes = segment.read_postings(offset, len).await?;
501 let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
502 block_sources.push((source_postings, *doc_offset));
503 }
504
505 let merged_blocks = BlockPostingList::concatenate_blocks(&block_sources)?;
506 let posting_offset = postings_out.len() as u64;
507 let mut encoded = Vec::new();
508 merged_blocks.serialize(&mut encoded)?;
509 postings_out.extend_from_slice(&encoded);
510
511 return Ok(TermInfo::external(
512 posting_offset,
513 encoded.len() as u32,
514 merged_blocks.doc_count(),
515 ));
516 }
517
518 let mut merged = PostingList::new();
520
521 for (seg_idx, term_info, doc_offset) in &sorted_sources {
522 let segment = &segments[*seg_idx];
523
524 if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
525 for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
527 merged.add(doc_id + doc_offset, tf);
528 }
529 } else {
530 let (offset, len) = term_info.external_info().unwrap();
532 let posting_bytes = segment.read_postings(offset, len).await?;
533 let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
534
535 let mut iter = source_postings.iterator();
536 while iter.doc() != TERMINATED {
537 merged.add(iter.doc() + doc_offset, iter.term_freq());
538 iter.advance();
539 }
540 }
541 }
542
543 let doc_ids: Vec<u32> = merged.iter().map(|p| p.doc_id).collect();
545 let term_freqs: Vec<u32> = merged.iter().map(|p| p.term_freq).collect();
546
547 if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
548 return Ok(inline);
549 }
550
551 let posting_offset = postings_out.len() as u64;
553 let block_list = BlockPostingList::from_posting_list(&merged)?;
554 let mut encoded = Vec::new();
555 block_list.serialize(&mut encoded)?;
556 postings_out.extend_from_slice(&encoded);
557
558 Ok(TermInfo::external(
559 posting_offset,
560 encoded.len() as u32,
561 merged.doc_count(),
562 ))
563 }
564 async fn merge_dense_vectors_with_stats<D: Directory + DirectoryWriter>(
570 &self,
571 dir: &D,
572 segments: &[SegmentReader],
573 files: &SegmentFiles,
574 ) -> Result<usize> {
575 use byteorder::{LittleEndian, WriteBytesExt};
576
577 let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
579
580 for (field, entry) in self.schema.fields() {
581 if !matches!(entry.field_type, FieldType::DenseVector) {
582 continue;
583 }
584
585 let scann_indexes: Vec<_> = segments
587 .iter()
588 .filter_map(|s| s.get_scann_vector_index(field))
589 .collect();
590
591 if scann_indexes.len()
592 == segments
593 .iter()
594 .filter(|s| s.has_dense_vector_index(field))
595 .count()
596 && !scann_indexes.is_empty()
597 {
598 let refs: Vec<&crate::structures::IVFPQIndex> =
600 scann_indexes.iter().map(|(idx, _)| idx.as_ref()).collect();
601
602 let mut doc_offsets = Vec::with_capacity(segments.len());
604 let mut offset = 0u32;
605 for segment in segments {
606 doc_offsets.push(offset);
607 offset += segment.num_docs();
608 }
609
610 match crate::structures::IVFPQIndex::merge(&refs, &doc_offsets) {
611 Ok(merged) => {
612 let bytes = merged
613 .to_bytes()
614 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
615 field_indexes.push((field.0, 2u8, bytes)); continue;
617 }
618 Err(e) => {
619 log::warn!("ScaNN merge failed: {}, falling back to IVF", e);
620 }
621 }
622 }
623
624 let ivf_indexes: Vec<_> = segments
626 .iter()
627 .filter_map(|s| s.get_ivf_vector_index(field))
628 .collect();
629
630 if ivf_indexes.len()
631 == segments
632 .iter()
633 .filter(|s| s.has_dense_vector_index(field))
634 .count()
635 && !ivf_indexes.is_empty()
636 {
637 let refs: Vec<&crate::structures::IVFRaBitQIndex> =
639 ivf_indexes.iter().map(|arc| arc.as_ref()).collect();
640
641 let mut doc_offsets = Vec::with_capacity(segments.len());
643 let mut offset = 0u32;
644 for segment in segments {
645 doc_offsets.push(offset);
646 offset += segment.num_docs();
647 }
648
649 match crate::structures::IVFRaBitQIndex::merge(&refs, &doc_offsets) {
650 Ok(merged) => {
651 let bytes = merged
652 .to_bytes()
653 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
654 field_indexes.push((field.0, 1u8, bytes)); continue;
656 }
657 Err(e) => {
658 log::warn!("IVF merge failed: {}, falling back to rebuild", e);
659 }
660 }
661 }
662
663 let mut all_vectors: Vec<Vec<f32>> = Vec::new();
665
666 for segment in segments {
667 if let Some(index) = segment.get_dense_vector_index(field)
668 && let Some(raw_vecs) = &index.raw_vectors
669 {
670 all_vectors.extend(raw_vecs.iter().cloned());
671 }
672 }
673
674 if !all_vectors.is_empty() {
675 let dim = all_vectors[0].len();
676 let config = RaBitQConfig::new(dim);
677 let merged_index = RaBitQIndex::build(config, &all_vectors, true);
678
679 let index_bytes = serde_json::to_vec(&merged_index)
680 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
681
682 field_indexes.push((field.0, 0u8, index_bytes)); }
684 }
685
686 if !field_indexes.is_empty() {
688 field_indexes.sort_by_key(|(id, _, _)| *id);
689
690 let header_size = 4 + field_indexes.len() * (4 + 1 + 8 + 8);
692 let mut output = Vec::new();
693
694 output.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
695
696 let mut current_offset = header_size as u64;
697 for (field_id, index_type, data) in &field_indexes {
698 output.write_u32::<LittleEndian>(*field_id)?;
699 output.write_u8(*index_type)?;
700 output.write_u64::<LittleEndian>(current_offset)?;
701 output.write_u64::<LittleEndian>(data.len() as u64)?;
702 current_offset += data.len() as u64;
703 }
704
705 for (_, _, data) in field_indexes {
706 output.extend_from_slice(&data);
707 }
708
709 let output_size = output.len();
710 dir.write(&files.vectors, &output).await?;
711 return Ok(output_size);
712 }
713
714 Ok(0)
715 }
716
717 async fn merge_sparse_vectors_optimized<D: Directory + DirectoryWriter>(
723 &self,
724 dir: &D,
725 segments: &[SegmentReader],
726 files: &SegmentFiles,
727 ) -> Result<usize> {
728 use crate::structures::BlockSparsePostingList;
729 use byteorder::{LittleEndian, WriteBytesExt};
730
731 let mut doc_offsets = Vec::with_capacity(segments.len());
733 let mut offset = 0u32;
734 for (i, segment) in segments.iter().enumerate() {
735 log::debug!(
736 "Sparse merge: segment {} has {} docs, doc_offset={}",
737 i,
738 segment.num_docs(),
739 offset
740 );
741 doc_offsets.push(offset);
742 offset += segment.num_docs();
743 }
744
745 let sparse_fields: Vec<_> = self
747 .schema
748 .fields()
749 .filter(|(_, entry)| matches!(entry.field_type, FieldType::SparseVector))
750 .map(|(field, entry)| (field, entry.sparse_vector_config.clone()))
751 .collect();
752
753 if sparse_fields.is_empty() {
754 return Ok(0);
755 }
756
757 type SparseFieldData = (
759 u32,
760 crate::structures::WeightQuantization,
761 u32,
762 FxHashMap<u32, Vec<u8>>,
763 );
764 let mut field_data: Vec<SparseFieldData> = Vec::new();
765
766 for (field, sparse_config) in &sparse_fields {
767 let quantization = sparse_config
769 .as_ref()
770 .map(|c| c.weight_quantization)
771 .unwrap_or(crate::structures::WeightQuantization::Float32);
772
773 let mut all_dims: rustc_hash::FxHashSet<u32> = rustc_hash::FxHashSet::default();
775 for segment in segments {
776 if let Some(sparse_index) = segment.sparse_indexes().get(&field.0) {
777 for (dim_id, posting) in sparse_index.postings.iter().enumerate() {
778 if posting.is_some() {
779 all_dims.insert(dim_id as u32);
780 }
781 }
782 }
783 }
784
785 if all_dims.is_empty() {
786 continue;
787 }
788
789 let max_dim_id = all_dims.iter().max().copied().unwrap_or(0);
790 let mut dim_bytes: FxHashMap<u32, Vec<u8>> = FxHashMap::default();
791
792 for dim_id in all_dims {
794 let mut lists_with_offsets: Vec<(&BlockSparsePostingList, u32)> = Vec::new();
796
797 for (seg_idx, segment) in segments.iter().enumerate() {
798 if let Some(sparse_index) = segment.sparse_indexes().get(&field.0)
799 && let Some(Some(posting_list)) = sparse_index.postings.get(dim_id as usize)
800 {
801 log::trace!(
802 "Sparse merge dim={}: seg={} offset={} doc_count={} blocks={}",
803 dim_id,
804 seg_idx,
805 doc_offsets[seg_idx],
806 posting_list.doc_count(),
807 posting_list.blocks.len()
808 );
809 lists_with_offsets.push((posting_list.as_ref(), doc_offsets[seg_idx]));
810 }
811 }
812
813 if lists_with_offsets.is_empty() {
814 continue;
815 }
816
817 let merged = BlockSparsePostingList::merge_with_offsets(&lists_with_offsets);
819
820 log::trace!(
821 "Sparse merge dim={}: merged {} lists -> doc_count={} blocks={}",
822 dim_id,
823 lists_with_offsets.len(),
824 merged.doc_count(),
825 merged.blocks.len()
826 );
827
828 let mut bytes = Vec::new();
830 merged.serialize(&mut bytes).map_err(crate::Error::Io)?;
831 dim_bytes.insert(dim_id, bytes);
832 }
833
834 field_data.push((field.0, quantization, max_dim_id + 1, dim_bytes));
835 }
836
837 if field_data.is_empty() {
838 return Ok(0);
839 }
840
841 field_data.sort_by_key(|(id, _, _, _)| *id);
843
844 let mut header_size = 4u64;
848 for (_, _, max_dim_id, _) in &field_data {
849 header_size += 4 + 1 + 4; header_size += (*max_dim_id as u64) * 12; }
852
853 let mut output = Vec::new();
854 output.write_u32::<LittleEndian>(field_data.len() as u32)?;
855
856 let mut current_offset = header_size;
857 let mut all_data: Vec<u8> = Vec::new();
858 let mut field_tables: Vec<Vec<(u64, u32)>> = Vec::new();
859
860 for (_, _, max_dim_id, dim_bytes) in &field_data {
861 let mut table: Vec<(u64, u32)> = vec![(0, 0); *max_dim_id as usize];
862
863 for dim_id in 0..*max_dim_id {
864 if let Some(bytes) = dim_bytes.get(&dim_id) {
865 table[dim_id as usize] = (current_offset, bytes.len() as u32);
866 current_offset += bytes.len() as u64;
867 all_data.extend_from_slice(bytes);
868 }
869 }
870 field_tables.push(table);
871 }
872
873 for (i, (field_id, quantization, max_dim_id, _)) in field_data.iter().enumerate() {
875 output.write_u32::<LittleEndian>(*field_id)?;
876 output.write_u8(*quantization as u8)?;
877 output.write_u32::<LittleEndian>(*max_dim_id)?;
878
879 for &(offset, length) in &field_tables[i] {
880 output.write_u64::<LittleEndian>(offset)?;
881 output.write_u32::<LittleEndian>(length)?;
882 }
883 }
884
885 output.extend_from_slice(&all_data);
887
888 let output_size = output.len();
889 dir.write(&files.sparse, &output).await?;
890
891 log::info!(
892 "Sparse vector merge complete: {} fields, {} bytes",
893 field_data.len(),
894 output_size
895 );
896
897 Ok(output_size)
898 }
899}
900
901pub struct TrainedVectorStructures {
903 pub centroids: rustc_hash::FxHashMap<u32, Arc<crate::structures::CoarseCentroids>>,
905 pub codebooks: rustc_hash::FxHashMap<u32, Arc<crate::structures::PQCodebook>>,
907}
908
909impl SegmentMerger {
910 pub async fn merge_with_ann<D: Directory + DirectoryWriter>(
915 &self,
916 dir: &D,
917 segments: &[SegmentReader],
918 new_segment_id: SegmentId,
919 trained: &TrainedVectorStructures,
920 ) -> Result<SegmentMeta> {
921 let files = SegmentFiles::new(new_segment_id.0);
922
923 let mut term_dict_data = Vec::new();
925 let mut postings_data = Vec::new();
926 let mut stats = MergeStats::default();
927 self.merge_postings_with_stats(
928 segments,
929 &mut term_dict_data,
930 &mut postings_data,
931 &mut stats,
932 )
933 .await?;
934
935 let mut store_data = Vec::new();
937 {
938 let mut store_merger = StoreMerger::new(&mut store_data);
939 for segment in segments {
940 let raw_blocks = segment.store_raw_blocks();
941 let data_slice = segment.store_data_slice();
942 store_merger.append_store(data_slice, &raw_blocks).await?;
943 }
944 store_merger.finish()?;
945 }
946
947 dir.write(&files.term_dict, &term_dict_data).await?;
949 dir.write(&files.postings, &postings_data).await?;
950 dir.write(&files.store, &store_data).await?;
951
952 drop(term_dict_data);
953 drop(postings_data);
954 drop(store_data);
955
956 let vectors_bytes = self
958 .build_ann_vectors(dir, segments, &files, trained)
959 .await?;
960
961 let mut merged_field_stats: rustc_hash::FxHashMap<u32, FieldStats> =
963 rustc_hash::FxHashMap::default();
964 for segment in segments {
965 for (&field_id, field_stats) in &segment.meta().field_stats {
966 let entry = merged_field_stats.entry(field_id).or_default();
967 entry.total_tokens += field_stats.total_tokens;
968 entry.doc_count += field_stats.doc_count;
969 }
970 }
971
972 let total_docs: u32 = segments.iter().map(|s| s.num_docs()).sum();
973 let meta = SegmentMeta {
974 id: new_segment_id.0,
975 num_docs: total_docs,
976 field_stats: merged_field_stats,
977 };
978
979 dir.write(&files.meta, &meta.serialize()?).await?;
980
981 log::info!(
982 "ANN merge complete: {} docs, vectors={}",
983 total_docs,
984 MergeStats::format_memory(vectors_bytes)
985 );
986
987 Ok(meta)
988 }
989
990 async fn build_ann_vectors<D: Directory + DirectoryWriter>(
992 &self,
993 dir: &D,
994 segments: &[SegmentReader],
995 files: &SegmentFiles,
996 trained: &TrainedVectorStructures,
997 ) -> Result<usize> {
998 use crate::dsl::VectorIndexType;
999 use byteorder::{LittleEndian, WriteBytesExt};
1000
1001 let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
1002
1003 for (field, entry) in self.schema.fields() {
1004 if !matches!(entry.field_type, FieldType::DenseVector) || !entry.indexed {
1005 continue;
1006 }
1007
1008 let config = match &entry.dense_vector_config {
1009 Some(c) => c,
1010 None => continue,
1011 };
1012
1013 let mut all_vectors: Vec<Vec<f32>> = Vec::new();
1015 let mut all_doc_ids: Vec<(u32, u16)> = Vec::new();
1016 let mut doc_offset = 0u32;
1017
1018 for segment in segments {
1019 if let Some(super::VectorIndex::Flat(flat_data)) =
1020 segment.vector_indexes().get(&field.0)
1021 {
1022 for (vec, (local_doc_id, ordinal)) in
1023 flat_data.vectors.iter().zip(flat_data.doc_ids.iter())
1024 {
1025 all_vectors.push(vec.clone());
1026 all_doc_ids.push((doc_offset + local_doc_id, *ordinal));
1027 }
1028 }
1029 doc_offset += segment.num_docs();
1030 }
1031
1032 if all_vectors.is_empty() {
1033 continue;
1034 }
1035
1036 let dim = config.index_dim();
1037
1038 let ann_doc_ids: Vec<u32> = all_doc_ids.iter().map(|(doc_id, _)| *doc_id).collect();
1040
1041 match config.index_type {
1043 VectorIndexType::IvfRaBitQ => {
1044 if let Some(centroids) = trained.centroids.get(&field.0) {
1045 let rabitq_config = crate::structures::RaBitQConfig::new(dim);
1047 let codebook = crate::structures::RaBitQCodebook::new(rabitq_config);
1048
1049 let ivf_config = crate::structures::IVFRaBitQConfig::new(dim)
1051 .with_store_raw(config.store_raw);
1052 let ivf_index = crate::structures::IVFRaBitQIndex::build(
1053 ivf_config,
1054 centroids,
1055 &codebook,
1056 &all_vectors,
1057 Some(&ann_doc_ids),
1058 );
1059
1060 let index_data = super::builder::IVFRaBitQIndexData {
1061 centroids: (**centroids).clone(),
1062 codebook,
1063 index: ivf_index,
1064 };
1065 let bytes = index_data
1066 .to_bytes()
1067 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
1068 field_indexes.push((field.0, 1u8, bytes)); log::info!(
1071 "Built IVF-RaBitQ index for field {} with {} vectors",
1072 field.0,
1073 all_vectors.len()
1074 );
1075 continue;
1076 }
1077 }
1078 VectorIndexType::ScaNN => {
1079 if let (Some(centroids), Some(codebook)) = (
1080 trained.centroids.get(&field.0),
1081 trained.codebooks.get(&field.0),
1082 ) {
1083 let ivf_pq_config = crate::structures::IVFPQConfig::new(dim);
1085 let ivf_pq_index = crate::structures::IVFPQIndex::build(
1086 ivf_pq_config,
1087 centroids,
1088 codebook,
1089 &all_vectors,
1090 Some(&ann_doc_ids),
1091 );
1092
1093 let index_data = super::builder::ScaNNIndexData {
1094 centroids: (**centroids).clone(),
1095 codebook: (**codebook).clone(),
1096 index: ivf_pq_index,
1097 };
1098 let bytes = index_data
1099 .to_bytes()
1100 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
1101 field_indexes.push((field.0, 2u8, bytes)); log::info!(
1104 "Built ScaNN index for field {} with {} vectors",
1105 field.0,
1106 all_vectors.len()
1107 );
1108 continue;
1109 }
1110 }
1111 _ => {}
1112 }
1113
1114 let flat_data = super::builder::FlatVectorData {
1116 dim,
1117 vectors: all_vectors,
1118 doc_ids: all_doc_ids,
1119 };
1120 let bytes = serde_json::to_vec(&flat_data)
1121 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
1122 field_indexes.push((field.0, 3u8, bytes)); }
1124
1125 if !field_indexes.is_empty() {
1127 field_indexes.sort_by_key(|(id, _, _)| *id);
1128
1129 let header_size = 4 + field_indexes.len() * (4 + 1 + 8 + 8);
1130 let mut output = Vec::new();
1131
1132 output.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
1133
1134 let mut current_offset = header_size as u64;
1135 for (field_id, index_type, data) in &field_indexes {
1136 output.write_u32::<LittleEndian>(*field_id)?;
1137 output.write_u8(*index_type)?;
1138 output.write_u64::<LittleEndian>(current_offset)?;
1139 output.write_u64::<LittleEndian>(data.len() as u64)?;
1140 current_offset += data.len() as u64;
1141 }
1142
1143 for (_, _, data) in field_indexes {
1144 output.extend_from_slice(&data);
1145 }
1146
1147 let output_size = output.len();
1148 dir.write(&files.vectors, &output).await?;
1149 return Ok(output_size);
1150 }
1151
1152 Ok(0)
1153 }
1154}
1155
1156pub async fn delete_segment<D: Directory + DirectoryWriter>(
1158 dir: &D,
1159 segment_id: SegmentId,
1160) -> Result<()> {
1161 let files = SegmentFiles::new(segment_id.0);
1162 let _ = dir.delete(&files.term_dict).await;
1163 let _ = dir.delete(&files.postings).await;
1164 let _ = dir.delete(&files.store).await;
1165 let _ = dir.delete(&files.meta).await;
1166 let _ = dir.delete(&files.vectors).await;
1167 Ok(())
1168}