1use std::cmp::Ordering;
4use std::collections::BinaryHeap;
5use std::sync::Arc;
6
7use rustc_hash::FxHashMap;
8
9use super::builder::{SegmentBuilder, SegmentBuilderConfig};
10use super::reader::SegmentReader;
11use super::store::StoreMerger;
12use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
13use crate::Result;
14use crate::directories::{Directory, DirectoryWriter};
15use crate::dsl::{FieldType, Schema};
16use crate::structures::{
17 BlockPostingList, PostingList, RaBitQConfig, RaBitQIndex, SSTableWriter, TERMINATED, TermInfo,
18};
19
20#[derive(Debug, Clone, Default)]
22pub struct MergeStats {
23 pub terms_processed: usize,
25 pub postings_merged: usize,
27 pub peak_memory_bytes: usize,
29 pub current_memory_bytes: usize,
31 pub term_dict_bytes: usize,
33 pub postings_bytes: usize,
35 pub store_bytes: usize,
37 pub vectors_bytes: usize,
39 pub sparse_bytes: usize,
41}
42
43impl MergeStats {
44 pub fn format_memory(bytes: usize) -> String {
46 if bytes >= 1024 * 1024 * 1024 {
47 format!("{:.2} GB", bytes as f64 / (1024.0 * 1024.0 * 1024.0))
48 } else if bytes >= 1024 * 1024 {
49 format!("{:.2} MB", bytes as f64 / (1024.0 * 1024.0))
50 } else if bytes >= 1024 {
51 format!("{:.2} KB", bytes as f64 / 1024.0)
52 } else {
53 format!("{} B", bytes)
54 }
55 }
56}
57
58struct MergeEntry {
60 key: Vec<u8>,
61 term_info: TermInfo,
62 segment_idx: usize,
63 doc_offset: u32,
64}
65
66impl PartialEq for MergeEntry {
67 fn eq(&self, other: &Self) -> bool {
68 self.key == other.key
69 }
70}
71
72impl Eq for MergeEntry {}
73
74impl PartialOrd for MergeEntry {
75 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
76 Some(self.cmp(other))
77 }
78}
79
80impl Ord for MergeEntry {
81 fn cmp(&self, other: &Self) -> Ordering {
82 other.key.cmp(&self.key)
84 }
85}
86
87pub struct SegmentMerger {
89 schema: Arc<Schema>,
90}
91
92impl SegmentMerger {
93 pub fn new(schema: Arc<Schema>) -> Self {
94 Self { schema }
95 }
96
97 pub async fn merge<D: Directory + DirectoryWriter>(
99 &self,
100 dir: &D,
101 segments: &[SegmentReader],
102 new_segment_id: SegmentId,
103 ) -> Result<SegmentMeta> {
104 let (meta, _stats) = self.merge_with_stats(dir, segments, new_segment_id).await?;
105 Ok(meta)
106 }
107
108 pub async fn merge_with_stats<D: Directory + DirectoryWriter>(
110 &self,
111 dir: &D,
112 segments: &[SegmentReader],
113 new_segment_id: SegmentId,
114 ) -> Result<(SegmentMeta, MergeStats)> {
115 let can_stack_stores = segments.iter().all(|s| !s.store_has_dict());
117
118 let has_positions = self
121 .schema
122 .fields()
123 .any(|(_, entry)| entry.positions.is_some());
124
125 if can_stack_stores && !has_positions {
127 self.merge_optimized_with_stats(dir, segments, new_segment_id)
128 .await
129 } else {
130 self.merge_rebuild_with_stats(dir, segments, new_segment_id)
131 .await
132 }
133 }
134
135 async fn merge_optimized_with_stats<D: Directory + DirectoryWriter>(
137 &self,
138 dir: &D,
139 segments: &[SegmentReader],
140 new_segment_id: SegmentId,
141 ) -> Result<(SegmentMeta, MergeStats)> {
142 let mut stats = MergeStats::default();
143 let files = SegmentFiles::new(new_segment_id.0);
144
145 let mut term_dict_data = Vec::new();
147 let mut postings_data = Vec::new();
148 let terms_processed = self
149 .merge_postings_with_stats(
150 segments,
151 &mut term_dict_data,
152 &mut postings_data,
153 &mut stats,
154 )
155 .await?;
156 stats.terms_processed = terms_processed;
157 stats.term_dict_bytes = term_dict_data.len();
158 stats.postings_bytes = postings_data.len();
159
160 let current_mem = term_dict_data.capacity() + postings_data.capacity();
162 stats.current_memory_bytes = current_mem;
163 stats.peak_memory_bytes = stats.peak_memory_bytes.max(current_mem);
164
165 let mut store_data = Vec::new();
167 {
168 let mut store_merger = StoreMerger::new(&mut store_data);
169 for segment in segments {
170 let raw_blocks = segment.store_raw_blocks();
171 let data_slice = segment.store_data_slice();
172 store_merger.append_store(data_slice, &raw_blocks).await?;
173 }
174 store_merger.finish()?;
175 }
176 stats.store_bytes = store_data.len();
177
178 let current_mem =
180 term_dict_data.capacity() + postings_data.capacity() + store_data.capacity();
181 stats.peak_memory_bytes = stats.peak_memory_bytes.max(current_mem);
182
183 dir.write(&files.term_dict, &term_dict_data).await?;
185 dir.write(&files.postings, &postings_data).await?;
186 dir.write(&files.store, &store_data).await?;
187
188 drop(term_dict_data);
190 drop(postings_data);
191 drop(store_data);
192
193 let vectors_bytes = self
195 .merge_dense_vectors_with_stats(dir, segments, &files)
196 .await?;
197 stats.vectors_bytes = vectors_bytes;
198
199 let sparse_bytes = self
201 .merge_sparse_vectors_optimized(dir, segments, &files)
202 .await?;
203 stats.sparse_bytes = sparse_bytes;
204
205 let mut merged_field_stats: FxHashMap<u32, FieldStats> = FxHashMap::default();
207 for segment in segments {
208 for (&field_id, field_stats) in &segment.meta().field_stats {
209 let entry = merged_field_stats.entry(field_id).or_default();
210 entry.total_tokens += field_stats.total_tokens;
211 entry.doc_count += field_stats.doc_count;
212 }
213 }
214
215 let total_docs: u32 = segments.iter().map(|s| s.num_docs()).sum();
216 let meta = SegmentMeta {
217 id: new_segment_id.0,
218 num_docs: total_docs,
219 field_stats: merged_field_stats,
220 };
221
222 dir.write(&files.meta, &meta.serialize()?).await?;
223
224 log::info!(
225 "Merge complete: {} terms, output: term_dict={}, postings={}, store={}, vectors={}, sparse={}",
226 stats.terms_processed,
227 MergeStats::format_memory(stats.term_dict_bytes),
228 MergeStats::format_memory(stats.postings_bytes),
229 MergeStats::format_memory(stats.store_bytes),
230 MergeStats::format_memory(stats.vectors_bytes),
231 MergeStats::format_memory(stats.sparse_bytes),
232 );
233
234 Ok((meta, stats))
235 }
236
237 async fn merge_rebuild_with_stats<D: Directory + DirectoryWriter>(
239 &self,
240 dir: &D,
241 segments: &[SegmentReader],
242 new_segment_id: SegmentId,
243 ) -> Result<(SegmentMeta, MergeStats)> {
244 let mut stats = MergeStats::default();
245
246 let mut builder =
247 SegmentBuilder::new((*self.schema).clone(), SegmentBuilderConfig::default())?;
248
249 for segment in segments {
250 for doc_id in 0..segment.num_docs() {
251 if let Some(doc) = segment.doc(doc_id).await? {
252 builder.add_document(doc)?;
253 }
254
255 if doc_id % 10000 == 0 {
257 let builder_stats = builder.stats();
258 stats.current_memory_bytes = builder_stats.estimated_memory_bytes;
259 stats.peak_memory_bytes =
260 stats.peak_memory_bytes.max(stats.current_memory_bytes);
261 }
262 }
263 }
264
265 let meta = builder.build(dir, new_segment_id).await?;
266 Ok((meta, stats))
267 }
268
269 async fn merge_postings_with_stats(
281 &self,
282 segments: &[SegmentReader],
283 term_dict: &mut Vec<u8>,
284 postings_out: &mut Vec<u8>,
285 stats: &mut MergeStats,
286 ) -> Result<usize> {
287 let mut doc_offsets = Vec::with_capacity(segments.len());
289 let mut offset = 0u32;
290 for segment in segments {
291 doc_offsets.push(offset);
292 offset += segment.num_docs();
293 }
294
295 let mut iterators: Vec<_> = segments.iter().map(|s| s.term_dict_iter()).collect();
297
298 let mut heap: BinaryHeap<MergeEntry> = BinaryHeap::new();
300 for (seg_idx, iter) in iterators.iter_mut().enumerate() {
301 if let Some((key, term_info)) = iter.next().await.map_err(crate::Error::from)? {
302 heap.push(MergeEntry {
303 key,
304 term_info,
305 segment_idx: seg_idx,
306 doc_offset: doc_offsets[seg_idx],
307 });
308 }
309 }
310
311 let mut term_results: Vec<(Vec<u8>, TermInfo)> = Vec::new();
314 let mut terms_processed = 0usize;
315
316 while !heap.is_empty() {
317 let first = heap.pop().unwrap();
319 let current_key = first.key.clone();
320
321 let mut sources: Vec<(usize, TermInfo, u32)> =
323 vec![(first.segment_idx, first.term_info, first.doc_offset)];
324
325 if let Some((key, term_info)) = iterators[first.segment_idx]
327 .next()
328 .await
329 .map_err(crate::Error::from)?
330 {
331 heap.push(MergeEntry {
332 key,
333 term_info,
334 segment_idx: first.segment_idx,
335 doc_offset: doc_offsets[first.segment_idx],
336 });
337 }
338
339 while let Some(entry) = heap.peek() {
341 if entry.key != current_key {
342 break;
343 }
344 let entry = heap.pop().unwrap();
345 sources.push((entry.segment_idx, entry.term_info, entry.doc_offset));
346
347 if let Some((key, term_info)) = iterators[entry.segment_idx]
349 .next()
350 .await
351 .map_err(crate::Error::from)?
352 {
353 heap.push(MergeEntry {
354 key,
355 term_info,
356 segment_idx: entry.segment_idx,
357 doc_offset: doc_offsets[entry.segment_idx],
358 });
359 }
360 }
361
362 let term_info = if sources.len() == 1 {
364 let (seg_idx, source_info, seg_doc_offset) = &sources[0];
366 self.copy_term_posting(
367 &segments[*seg_idx],
368 source_info,
369 *seg_doc_offset,
370 postings_out,
371 )
372 .await?
373 } else {
374 self.merge_term_postings(segments, &sources, postings_out)
376 .await?
377 };
378
379 term_results.push((current_key, term_info));
380 terms_processed += 1;
381
382 if terms_processed.is_multiple_of(100_000) {
384 log::debug!("Merge progress: {} terms processed", terms_processed);
385 }
386 }
387
388 let results_mem = term_results.capacity() * std::mem::size_of::<(Vec<u8>, TermInfo)>();
390 stats.current_memory_bytes = results_mem + postings_out.capacity();
391 stats.peak_memory_bytes = stats.peak_memory_bytes.max(stats.current_memory_bytes);
392
393 log::info!(
394 "[merge] complete: terms={}, segments={}, term_buffer={:.2} MB, postings={:.2} MB, peak={:.2} MB",
395 terms_processed,
396 segments.len(),
397 results_mem as f64 / (1024.0 * 1024.0),
398 postings_out.capacity() as f64 / (1024.0 * 1024.0),
399 stats.peak_memory_bytes as f64 / (1024.0 * 1024.0)
400 );
401
402 let mut writer = SSTableWriter::<TermInfo>::new(term_dict);
404 for (key, term_info) in term_results {
405 writer.insert(&key, &term_info)?;
406 }
407 writer.finish()?;
408
409 Ok(terms_processed)
410 }
411
412 async fn copy_term_posting(
415 &self,
416 segment: &SegmentReader,
417 source_info: &TermInfo,
418 doc_offset: u32,
419 postings_out: &mut Vec<u8>,
420 ) -> Result<TermInfo> {
421 if let Some((doc_ids, term_freqs)) = source_info.decode_inline() {
423 let remapped_ids: Vec<u32> = doc_ids.iter().map(|&id| id + doc_offset).collect();
424 if let Some(inline) = TermInfo::try_inline(&remapped_ids, &term_freqs) {
425 return Ok(inline);
426 }
427 let mut pl = PostingList::with_capacity(remapped_ids.len());
429 for (doc_id, tf) in remapped_ids.into_iter().zip(term_freqs.into_iter()) {
430 pl.push(doc_id, tf);
431 }
432 let posting_offset = postings_out.len() as u64;
433 let block_list = BlockPostingList::from_posting_list(&pl)?;
434 let mut encoded = Vec::new();
435 block_list.serialize(&mut encoded)?;
436 postings_out.extend_from_slice(&encoded);
437 return Ok(TermInfo::external(
438 posting_offset,
439 encoded.len() as u32,
440 pl.doc_count(),
441 ));
442 }
443
444 let (offset, len) = source_info.external_info().unwrap();
447 let posting_bytes = segment.read_postings(offset, len).await?;
448 let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
449
450 let mut remapped = PostingList::with_capacity(source_postings.doc_count() as usize);
452 let mut iter = source_postings.iterator();
453 while iter.doc() != TERMINATED {
454 remapped.add(iter.doc() + doc_offset, iter.term_freq());
455 iter.advance();
456 }
457
458 let doc_ids: Vec<u32> = remapped.iter().map(|p| p.doc_id).collect();
460 let term_freqs: Vec<u32> = remapped.iter().map(|p| p.term_freq).collect();
461
462 if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
463 return Ok(inline);
464 }
465
466 let posting_offset = postings_out.len() as u64;
468 let block_list = BlockPostingList::from_posting_list(&remapped)?;
469 let mut encoded = Vec::new();
470 block_list.serialize(&mut encoded)?;
471 postings_out.extend_from_slice(&encoded);
472
473 Ok(TermInfo::external(
474 posting_offset,
475 encoded.len() as u32,
476 remapped.doc_count(),
477 ))
478 }
479
480 async fn merge_term_postings(
483 &self,
484 segments: &[SegmentReader],
485 sources: &[(usize, TermInfo, u32)],
486 postings_out: &mut Vec<u8>,
487 ) -> Result<TermInfo> {
488 let mut sorted_sources: Vec<_> = sources.to_vec();
490 sorted_sources.sort_by_key(|(_, _, doc_offset)| *doc_offset);
491
492 let all_external = sorted_sources
494 .iter()
495 .all(|(_, term_info, _)| term_info.external_info().is_some());
496
497 if all_external && sorted_sources.len() > 1 {
498 let mut block_sources = Vec::with_capacity(sorted_sources.len());
500
501 for (seg_idx, term_info, doc_offset) in &sorted_sources {
502 let segment = &segments[*seg_idx];
503 let (offset, len) = term_info.external_info().unwrap();
504 let posting_bytes = segment.read_postings(offset, len).await?;
505 let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
506 block_sources.push((source_postings, *doc_offset));
507 }
508
509 let merged_blocks = BlockPostingList::concatenate_blocks(&block_sources)?;
510 let posting_offset = postings_out.len() as u64;
511 let mut encoded = Vec::new();
512 merged_blocks.serialize(&mut encoded)?;
513 postings_out.extend_from_slice(&encoded);
514
515 return Ok(TermInfo::external(
516 posting_offset,
517 encoded.len() as u32,
518 merged_blocks.doc_count(),
519 ));
520 }
521
522 let mut merged = PostingList::new();
524
525 for (seg_idx, term_info, doc_offset) in &sorted_sources {
526 let segment = &segments[*seg_idx];
527
528 if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
529 for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
531 merged.add(doc_id + doc_offset, tf);
532 }
533 } else {
534 let (offset, len) = term_info.external_info().unwrap();
536 let posting_bytes = segment.read_postings(offset, len).await?;
537 let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
538
539 let mut iter = source_postings.iterator();
540 while iter.doc() != TERMINATED {
541 merged.add(iter.doc() + doc_offset, iter.term_freq());
542 iter.advance();
543 }
544 }
545 }
546
547 let doc_ids: Vec<u32> = merged.iter().map(|p| p.doc_id).collect();
549 let term_freqs: Vec<u32> = merged.iter().map(|p| p.term_freq).collect();
550
551 if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
552 return Ok(inline);
553 }
554
555 let posting_offset = postings_out.len() as u64;
557 let block_list = BlockPostingList::from_posting_list(&merged)?;
558 let mut encoded = Vec::new();
559 block_list.serialize(&mut encoded)?;
560 postings_out.extend_from_slice(&encoded);
561
562 Ok(TermInfo::external(
563 posting_offset,
564 encoded.len() as u32,
565 merged.doc_count(),
566 ))
567 }
568 async fn merge_dense_vectors_with_stats<D: Directory + DirectoryWriter>(
574 &self,
575 dir: &D,
576 segments: &[SegmentReader],
577 files: &SegmentFiles,
578 ) -> Result<usize> {
579 use byteorder::{LittleEndian, WriteBytesExt};
580
581 let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
583
584 for (field, entry) in self.schema.fields() {
585 if !matches!(entry.field_type, FieldType::DenseVector) {
586 continue;
587 }
588
589 let scann_indexes: Vec<_> = segments
591 .iter()
592 .filter_map(|s| s.get_scann_vector_index(field))
593 .collect();
594
595 if scann_indexes.len()
596 == segments
597 .iter()
598 .filter(|s| s.has_dense_vector_index(field))
599 .count()
600 && !scann_indexes.is_empty()
601 {
602 let refs: Vec<&crate::structures::IVFPQIndex> =
604 scann_indexes.iter().map(|(idx, _)| idx.as_ref()).collect();
605
606 let mut doc_offsets = Vec::with_capacity(segments.len());
608 let mut offset = 0u32;
609 for segment in segments {
610 doc_offsets.push(offset);
611 offset += segment.num_docs();
612 }
613
614 match crate::structures::IVFPQIndex::merge(&refs, &doc_offsets) {
615 Ok(merged) => {
616 let bytes = merged
617 .to_bytes()
618 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
619 field_indexes.push((field.0, 2u8, bytes)); continue;
621 }
622 Err(e) => {
623 log::warn!("ScaNN merge failed: {}, falling back to IVF", e);
624 }
625 }
626 }
627
628 let ivf_indexes: Vec<_> = segments
630 .iter()
631 .filter_map(|s| s.get_ivf_vector_index(field))
632 .collect();
633
634 if ivf_indexes.len()
635 == segments
636 .iter()
637 .filter(|s| s.has_dense_vector_index(field))
638 .count()
639 && !ivf_indexes.is_empty()
640 {
641 let refs: Vec<&crate::structures::IVFRaBitQIndex> =
643 ivf_indexes.iter().map(|arc| arc.as_ref()).collect();
644
645 let mut doc_offsets = Vec::with_capacity(segments.len());
647 let mut offset = 0u32;
648 for segment in segments {
649 doc_offsets.push(offset);
650 offset += segment.num_docs();
651 }
652
653 match crate::structures::IVFRaBitQIndex::merge(&refs, &doc_offsets) {
654 Ok(merged) => {
655 let bytes = merged
656 .to_bytes()
657 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
658 field_indexes.push((field.0, 1u8, bytes)); continue;
660 }
661 Err(e) => {
662 log::warn!("IVF merge failed: {}, falling back to rebuild", e);
663 }
664 }
665 }
666
667 let mut all_vectors: Vec<Vec<f32>> = Vec::new();
669
670 for segment in segments {
671 if let Some(index) = segment.get_dense_vector_index(field)
672 && let Some(raw_vecs) = &index.raw_vectors
673 {
674 all_vectors.extend(raw_vecs.iter().cloned());
675 }
676 }
677
678 if !all_vectors.is_empty() {
679 let dim = all_vectors[0].len();
680 let config = RaBitQConfig::new(dim);
681 let merged_index = RaBitQIndex::build(config, &all_vectors, true);
682
683 let index_bytes = serde_json::to_vec(&merged_index)
684 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
685
686 field_indexes.push((field.0, 0u8, index_bytes)); }
688 }
689
690 if !field_indexes.is_empty() {
692 field_indexes.sort_by_key(|(id, _, _)| *id);
693
694 let header_size = 4 + field_indexes.len() * (4 + 1 + 8 + 8);
696 let mut output = Vec::new();
697
698 output.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
699
700 let mut current_offset = header_size as u64;
701 for (field_id, index_type, data) in &field_indexes {
702 output.write_u32::<LittleEndian>(*field_id)?;
703 output.write_u8(*index_type)?;
704 output.write_u64::<LittleEndian>(current_offset)?;
705 output.write_u64::<LittleEndian>(data.len() as u64)?;
706 current_offset += data.len() as u64;
707 }
708
709 for (_, _, data) in field_indexes {
710 output.extend_from_slice(&data);
711 }
712
713 let output_size = output.len();
714 dir.write(&files.vectors, &output).await?;
715 return Ok(output_size);
716 }
717
718 Ok(0)
719 }
720
721 async fn merge_sparse_vectors_optimized<D: Directory + DirectoryWriter>(
727 &self,
728 dir: &D,
729 segments: &[SegmentReader],
730 files: &SegmentFiles,
731 ) -> Result<usize> {
732 use crate::structures::BlockSparsePostingList;
733 use byteorder::{LittleEndian, WriteBytesExt};
734
735 let mut doc_offsets = Vec::with_capacity(segments.len());
737 let mut offset = 0u32;
738 for (i, segment) in segments.iter().enumerate() {
739 log::debug!(
740 "Sparse merge: segment {} has {} docs, doc_offset={}",
741 i,
742 segment.num_docs(),
743 offset
744 );
745 doc_offsets.push(offset);
746 offset += segment.num_docs();
747 }
748
749 let sparse_fields: Vec<_> = self
751 .schema
752 .fields()
753 .filter(|(_, entry)| matches!(entry.field_type, FieldType::SparseVector))
754 .map(|(field, entry)| (field, entry.sparse_vector_config.clone()))
755 .collect();
756
757 if sparse_fields.is_empty() {
758 return Ok(0);
759 }
760
761 type SparseFieldData = (
763 u32,
764 crate::structures::WeightQuantization,
765 u32,
766 FxHashMap<u32, Vec<u8>>,
767 );
768 let mut field_data: Vec<SparseFieldData> = Vec::new();
769
770 for (field, sparse_config) in &sparse_fields {
771 let quantization = sparse_config
773 .as_ref()
774 .map(|c| c.weight_quantization)
775 .unwrap_or(crate::structures::WeightQuantization::Float32);
776
777 let mut all_dims: rustc_hash::FxHashSet<u32> = rustc_hash::FxHashSet::default();
779 for segment in segments {
780 if let Some(sparse_index) = segment.sparse_indexes().get(&field.0) {
781 for dim_id in sparse_index.active_dimensions() {
782 all_dims.insert(dim_id);
783 }
784 }
785 }
786
787 if all_dims.is_empty() {
788 continue;
789 }
790
791 let max_dim_id = all_dims.iter().max().copied().unwrap_or(0);
792 let mut dim_bytes: FxHashMap<u32, Vec<u8>> = FxHashMap::default();
793
794 for dim_id in all_dims {
796 let mut posting_arcs: Vec<(Arc<BlockSparsePostingList>, u32)> = Vec::new();
799
800 for (seg_idx, segment) in segments.iter().enumerate() {
801 if let Some(sparse_index) = segment.sparse_indexes().get(&field.0)
802 && let Ok(Some(posting_list)) = sparse_index.get_posting_blocking(dim_id)
803 {
804 log::trace!(
805 "Sparse merge dim={}: seg={} offset={} doc_count={} blocks={}",
806 dim_id,
807 seg_idx,
808 doc_offsets[seg_idx],
809 posting_list.doc_count(),
810 posting_list.blocks.len()
811 );
812 posting_arcs.push((posting_list, doc_offsets[seg_idx]));
813 }
814 }
815
816 let lists_with_offsets: Vec<(&BlockSparsePostingList, u32)> = posting_arcs
818 .iter()
819 .map(|(pl, offset)| (pl.as_ref(), *offset))
820 .collect();
821
822 if lists_with_offsets.is_empty() {
823 continue;
824 }
825
826 let merged = BlockSparsePostingList::merge_with_offsets(&lists_with_offsets);
828
829 log::trace!(
830 "Sparse merge dim={}: merged {} lists -> doc_count={} blocks={}",
831 dim_id,
832 lists_with_offsets.len(),
833 merged.doc_count(),
834 merged.blocks.len()
835 );
836
837 let mut bytes = Vec::new();
839 merged.serialize(&mut bytes).map_err(crate::Error::Io)?;
840 dim_bytes.insert(dim_id, bytes);
841 }
842
843 field_data.push((field.0, quantization, max_dim_id + 1, dim_bytes));
844 }
845
846 if field_data.is_empty() {
847 return Ok(0);
848 }
849
850 field_data.sort_by_key(|(id, _, _, _)| *id);
852
853 let mut header_size = 4u64;
857 for (_, _, max_dim_id, _) in &field_data {
858 header_size += 4 + 1 + 4; header_size += (*max_dim_id as u64) * 12; }
861
862 let mut output = Vec::new();
863 output.write_u32::<LittleEndian>(field_data.len() as u32)?;
864
865 let mut current_offset = header_size;
866 let mut all_data: Vec<u8> = Vec::new();
867 let mut field_tables: Vec<Vec<(u64, u32)>> = Vec::new();
868
869 for (_, _, max_dim_id, dim_bytes) in &field_data {
870 let mut table: Vec<(u64, u32)> = vec![(0, 0); *max_dim_id as usize];
871
872 for dim_id in 0..*max_dim_id {
873 if let Some(bytes) = dim_bytes.get(&dim_id) {
874 table[dim_id as usize] = (current_offset, bytes.len() as u32);
875 current_offset += bytes.len() as u64;
876 all_data.extend_from_slice(bytes);
877 }
878 }
879 field_tables.push(table);
880 }
881
882 for (i, (field_id, quantization, max_dim_id, _)) in field_data.iter().enumerate() {
884 output.write_u32::<LittleEndian>(*field_id)?;
885 output.write_u8(*quantization as u8)?;
886 output.write_u32::<LittleEndian>(*max_dim_id)?;
887
888 for &(offset, length) in &field_tables[i] {
889 output.write_u64::<LittleEndian>(offset)?;
890 output.write_u32::<LittleEndian>(length)?;
891 }
892 }
893
894 output.extend_from_slice(&all_data);
896
897 let output_size = output.len();
898 dir.write(&files.sparse, &output).await?;
899
900 log::info!(
901 "Sparse vector merge complete: {} fields, {} bytes",
902 field_data.len(),
903 output_size
904 );
905
906 Ok(output_size)
907 }
908}
909
910pub struct TrainedVectorStructures {
912 pub centroids: rustc_hash::FxHashMap<u32, Arc<crate::structures::CoarseCentroids>>,
914 pub codebooks: rustc_hash::FxHashMap<u32, Arc<crate::structures::PQCodebook>>,
916}
917
918impl SegmentMerger {
919 pub async fn merge_with_ann<D: Directory + DirectoryWriter>(
924 &self,
925 dir: &D,
926 segments: &[SegmentReader],
927 new_segment_id: SegmentId,
928 trained: &TrainedVectorStructures,
929 ) -> Result<SegmentMeta> {
930 let files = SegmentFiles::new(new_segment_id.0);
931
932 let mut term_dict_data = Vec::new();
934 let mut postings_data = Vec::new();
935 let mut stats = MergeStats::default();
936 self.merge_postings_with_stats(
937 segments,
938 &mut term_dict_data,
939 &mut postings_data,
940 &mut stats,
941 )
942 .await?;
943
944 let mut store_data = Vec::new();
946 {
947 let mut store_merger = StoreMerger::new(&mut store_data);
948 for segment in segments {
949 let raw_blocks = segment.store_raw_blocks();
950 let data_slice = segment.store_data_slice();
951 store_merger.append_store(data_slice, &raw_blocks).await?;
952 }
953 store_merger.finish()?;
954 }
955
956 dir.write(&files.term_dict, &term_dict_data).await?;
958 dir.write(&files.postings, &postings_data).await?;
959 dir.write(&files.store, &store_data).await?;
960
961 drop(term_dict_data);
962 drop(postings_data);
963 drop(store_data);
964
965 let vectors_bytes = self
967 .build_ann_vectors(dir, segments, &files, trained)
968 .await?;
969
970 let mut merged_field_stats: rustc_hash::FxHashMap<u32, FieldStats> =
972 rustc_hash::FxHashMap::default();
973 for segment in segments {
974 for (&field_id, field_stats) in &segment.meta().field_stats {
975 let entry = merged_field_stats.entry(field_id).or_default();
976 entry.total_tokens += field_stats.total_tokens;
977 entry.doc_count += field_stats.doc_count;
978 }
979 }
980
981 let total_docs: u32 = segments.iter().map(|s| s.num_docs()).sum();
982 let meta = SegmentMeta {
983 id: new_segment_id.0,
984 num_docs: total_docs,
985 field_stats: merged_field_stats,
986 };
987
988 dir.write(&files.meta, &meta.serialize()?).await?;
989
990 log::info!(
991 "ANN merge complete: {} docs, vectors={}",
992 total_docs,
993 MergeStats::format_memory(vectors_bytes)
994 );
995
996 Ok(meta)
997 }
998
999 async fn build_ann_vectors<D: Directory + DirectoryWriter>(
1001 &self,
1002 dir: &D,
1003 segments: &[SegmentReader],
1004 files: &SegmentFiles,
1005 trained: &TrainedVectorStructures,
1006 ) -> Result<usize> {
1007 use crate::dsl::VectorIndexType;
1008 use byteorder::{LittleEndian, WriteBytesExt};
1009
1010 let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
1011
1012 for (field, entry) in self.schema.fields() {
1013 if !matches!(entry.field_type, FieldType::DenseVector) || !entry.indexed {
1014 continue;
1015 }
1016
1017 let config = match &entry.dense_vector_config {
1018 Some(c) => c,
1019 None => continue,
1020 };
1021
1022 let mut all_vectors: Vec<Vec<f32>> = Vec::new();
1024 let mut all_doc_ids: Vec<(u32, u16)> = Vec::new();
1025 let mut doc_offset = 0u32;
1026
1027 for segment in segments {
1028 if let Some(super::VectorIndex::Flat(flat_data)) =
1029 segment.vector_indexes().get(&field.0)
1030 {
1031 for (vec, (local_doc_id, ordinal)) in
1032 flat_data.vectors.iter().zip(flat_data.doc_ids.iter())
1033 {
1034 all_vectors.push(vec.clone());
1035 all_doc_ids.push((doc_offset + local_doc_id, *ordinal));
1036 }
1037 }
1038 doc_offset += segment.num_docs();
1039 }
1040
1041 if all_vectors.is_empty() {
1042 continue;
1043 }
1044
1045 let dim = config.index_dim();
1046
1047 let ann_doc_ids: Vec<u32> = all_doc_ids.iter().map(|(doc_id, _)| *doc_id).collect();
1049
1050 match config.index_type {
1052 VectorIndexType::IvfRaBitQ => {
1053 if let Some(centroids) = trained.centroids.get(&field.0) {
1054 let rabitq_config = crate::structures::RaBitQConfig::new(dim);
1056 let codebook = crate::structures::RaBitQCodebook::new(rabitq_config);
1057
1058 let ivf_config = crate::structures::IVFRaBitQConfig::new(dim)
1060 .with_store_raw(config.store_raw);
1061 let ivf_index = crate::structures::IVFRaBitQIndex::build(
1062 ivf_config,
1063 centroids,
1064 &codebook,
1065 &all_vectors,
1066 Some(&ann_doc_ids),
1067 );
1068
1069 let index_data = super::builder::IVFRaBitQIndexData {
1070 centroids: (**centroids).clone(),
1071 codebook,
1072 index: ivf_index,
1073 };
1074 let bytes = index_data
1075 .to_bytes()
1076 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
1077 field_indexes.push((field.0, 1u8, bytes)); log::info!(
1080 "Built IVF-RaBitQ index for field {} with {} vectors",
1081 field.0,
1082 all_vectors.len()
1083 );
1084 continue;
1085 }
1086 }
1087 VectorIndexType::ScaNN => {
1088 if let (Some(centroids), Some(codebook)) = (
1089 trained.centroids.get(&field.0),
1090 trained.codebooks.get(&field.0),
1091 ) {
1092 let ivf_pq_config = crate::structures::IVFPQConfig::new(dim);
1094 let ivf_pq_index = crate::structures::IVFPQIndex::build(
1095 ivf_pq_config,
1096 centroids,
1097 codebook,
1098 &all_vectors,
1099 Some(&ann_doc_ids),
1100 );
1101
1102 let index_data = super::builder::ScaNNIndexData {
1103 centroids: (**centroids).clone(),
1104 codebook: (**codebook).clone(),
1105 index: ivf_pq_index,
1106 };
1107 let bytes = index_data
1108 .to_bytes()
1109 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
1110 field_indexes.push((field.0, 2u8, bytes)); log::info!(
1113 "Built ScaNN index for field {} with {} vectors",
1114 field.0,
1115 all_vectors.len()
1116 );
1117 continue;
1118 }
1119 }
1120 _ => {}
1121 }
1122
1123 let flat_data = super::builder::FlatVectorData {
1125 dim,
1126 vectors: all_vectors,
1127 doc_ids: all_doc_ids,
1128 };
1129 let bytes = serde_json::to_vec(&flat_data)
1130 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
1131 field_indexes.push((field.0, 3u8, bytes)); }
1133
1134 if !field_indexes.is_empty() {
1136 field_indexes.sort_by_key(|(id, _, _)| *id);
1137
1138 let header_size = 4 + field_indexes.len() * (4 + 1 + 8 + 8);
1139 let mut output = Vec::new();
1140
1141 output.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
1142
1143 let mut current_offset = header_size as u64;
1144 for (field_id, index_type, data) in &field_indexes {
1145 output.write_u32::<LittleEndian>(*field_id)?;
1146 output.write_u8(*index_type)?;
1147 output.write_u64::<LittleEndian>(current_offset)?;
1148 output.write_u64::<LittleEndian>(data.len() as u64)?;
1149 current_offset += data.len() as u64;
1150 }
1151
1152 for (_, _, data) in field_indexes {
1153 output.extend_from_slice(&data);
1154 }
1155
1156 let output_size = output.len();
1157 dir.write(&files.vectors, &output).await?;
1158 return Ok(output_size);
1159 }
1160
1161 Ok(0)
1162 }
1163}
1164
1165pub async fn delete_segment<D: Directory + DirectoryWriter>(
1167 dir: &D,
1168 segment_id: SegmentId,
1169) -> Result<()> {
1170 let files = SegmentFiles::new(segment_id.0);
1171 let _ = dir.delete(&files.term_dict).await;
1172 let _ = dir.delete(&files.postings).await;
1173 let _ = dir.delete(&files.store).await;
1174 let _ = dir.delete(&files.meta).await;
1175 let _ = dir.delete(&files.vectors).await;
1176 Ok(())
1177}