1use std::cmp::Ordering;
4use std::collections::BinaryHeap;
5use std::io::Write;
6use std::mem::size_of;
7use std::sync::Arc;
8
9use rustc_hash::FxHashMap;
10
11use super::reader::SegmentReader;
12use super::store::StoreMerger;
13use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
14use crate::Result;
15use crate::directories::{Directory, DirectoryWriter, StreamingWriter};
16use crate::dsl::{FieldType, Schema};
17use crate::structures::{
18 BlockPostingList, PositionPostingList, PostingList, RaBitQConfig, RaBitQIndex, SSTableWriter,
19 TERMINATED, TermInfo,
20};
21
22pub(crate) struct OffsetWriter {
27 inner: Box<dyn StreamingWriter>,
28 offset: u64,
29}
30
31impl OffsetWriter {
32 fn new(inner: Box<dyn StreamingWriter>) -> Self {
33 Self { inner, offset: 0 }
34 }
35
36 fn offset(&self) -> u64 {
38 self.offset
39 }
40
41 fn finish(self) -> std::io::Result<()> {
43 self.inner.finish()
44 }
45}
46
47impl Write for OffsetWriter {
48 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
49 let n = self.inner.write(buf)?;
50 self.offset += n as u64;
51 Ok(n)
52 }
53
54 fn flush(&mut self) -> std::io::Result<()> {
55 self.inner.flush()
56 }
57}
58
59fn format_bytes(bytes: usize) -> String {
61 if bytes >= 1024 * 1024 * 1024 {
62 format!("{:.2} GB", bytes as f64 / (1024.0 * 1024.0 * 1024.0))
63 } else if bytes >= 1024 * 1024 {
64 format!("{:.2} MB", bytes as f64 / (1024.0 * 1024.0))
65 } else if bytes >= 1024 {
66 format!("{:.2} KB", bytes as f64 / 1024.0)
67 } else {
68 format!("{} B", bytes)
69 }
70}
71
72fn doc_offsets(segments: &[SegmentReader]) -> Vec<u32> {
74 let mut offsets = Vec::with_capacity(segments.len());
75 let mut acc = 0u32;
76 for seg in segments {
77 offsets.push(acc);
78 acc += seg.num_docs();
79 }
80 offsets
81}
82
83#[derive(Debug, Clone, Default)]
85pub struct MergeStats {
86 pub terms_processed: usize,
88 pub peak_memory_bytes: usize,
90 pub current_memory_bytes: usize,
92 pub term_dict_bytes: usize,
94 pub postings_bytes: usize,
96 pub store_bytes: usize,
98 pub vectors_bytes: usize,
100 pub sparse_bytes: usize,
102}
103
104struct MergeEntry {
106 key: Vec<u8>,
107 term_info: TermInfo,
108 segment_idx: usize,
109 doc_offset: u32,
110}
111
112impl PartialEq for MergeEntry {
113 fn eq(&self, other: &Self) -> bool {
114 self.key == other.key
115 }
116}
117
118impl Eq for MergeEntry {}
119
120impl PartialOrd for MergeEntry {
121 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
122 Some(self.cmp(other))
123 }
124}
125
126impl Ord for MergeEntry {
127 fn cmp(&self, other: &Self) -> Ordering {
128 other.key.cmp(&self.key)
130 }
131}
132
133pub struct TrainedVectorStructures {
135 pub centroids: rustc_hash::FxHashMap<u32, Arc<crate::structures::CoarseCentroids>>,
137 pub codebooks: rustc_hash::FxHashMap<u32, Arc<crate::structures::PQCodebook>>,
139}
140
141pub enum DenseVectorStrategy<'a> {
143 MergeExisting,
145 BuildAnn(&'a TrainedVectorStructures),
147}
148
149pub struct SegmentMerger {
151 schema: Arc<Schema>,
152}
153
154impl SegmentMerger {
155 pub fn new(schema: Arc<Schema>) -> Self {
156 Self { schema }
157 }
158
159 pub async fn merge<D: Directory + DirectoryWriter>(
161 &self,
162 dir: &D,
163 segments: &[SegmentReader],
164 new_segment_id: SegmentId,
165 ) -> Result<(SegmentMeta, MergeStats)> {
166 self.merge_core(
167 dir,
168 segments,
169 new_segment_id,
170 DenseVectorStrategy::MergeExisting,
171 )
172 .await
173 }
174
175 async fn merge_core<D: Directory + DirectoryWriter>(
182 &self,
183 dir: &D,
184 segments: &[SegmentReader],
185 new_segment_id: SegmentId,
186 dense_strategy: DenseVectorStrategy<'_>,
187 ) -> Result<(SegmentMeta, MergeStats)> {
188 let mut stats = MergeStats::default();
189 let files = SegmentFiles::new(new_segment_id.0);
190
191 let mut postings_writer = OffsetWriter::new(dir.streaming_writer(&files.postings).await?);
193 let mut positions_writer = OffsetWriter::new(dir.streaming_writer(&files.positions).await?);
194 let mut term_dict_writer = OffsetWriter::new(dir.streaming_writer(&files.term_dict).await?);
195
196 let terms_processed = self
197 .merge_postings(
198 segments,
199 &mut term_dict_writer,
200 &mut postings_writer,
201 &mut positions_writer,
202 &mut stats,
203 )
204 .await?;
205 stats.terms_processed = terms_processed;
206 stats.postings_bytes = postings_writer.offset() as usize;
207 stats.term_dict_bytes = term_dict_writer.offset() as usize;
208 let positions_bytes = positions_writer.offset();
209
210 postings_writer.finish()?;
211 term_dict_writer.finish()?;
212 if positions_bytes > 0 {
213 positions_writer.finish()?;
214 } else {
215 drop(positions_writer);
216 let _ = dir.delete(&files.positions).await;
217 }
218
219 {
221 let mut store_writer = OffsetWriter::new(dir.streaming_writer(&files.store).await?);
222 {
223 let mut store_merger = StoreMerger::new(&mut store_writer);
224 for segment in segments {
225 if segment.store_has_dict() {
226 store_merger
227 .append_store_recompressing(segment.store())
228 .await
229 .map_err(crate::Error::Io)?;
230 } else {
231 let raw_blocks = segment.store_raw_blocks();
232 let data_slice = segment.store_data_slice();
233 store_merger.append_store(data_slice, &raw_blocks).await?;
234 }
235 }
236 store_merger.finish()?;
237 }
238 stats.store_bytes = store_writer.offset() as usize;
239 store_writer.finish()?;
240 }
241
242 let vectors_bytes = match &dense_strategy {
244 DenseVectorStrategy::MergeExisting => {
245 self.merge_dense_vectors(dir, segments, &files).await?
246 }
247 DenseVectorStrategy::BuildAnn(trained) => {
248 self.build_ann_vectors(dir, segments, &files, trained)
249 .await?
250 }
251 };
252 stats.vectors_bytes = vectors_bytes;
253
254 let sparse_bytes = self.merge_sparse_vectors(dir, segments, &files).await?;
256 stats.sparse_bytes = sparse_bytes;
257
258 let mut merged_field_stats: FxHashMap<u32, FieldStats> = FxHashMap::default();
260 for segment in segments {
261 for (&field_id, field_stats) in &segment.meta().field_stats {
262 let entry = merged_field_stats.entry(field_id).or_default();
263 entry.total_tokens += field_stats.total_tokens;
264 entry.doc_count += field_stats.doc_count;
265 }
266 }
267
268 let total_docs: u32 = segments.iter().map(|s| s.num_docs()).sum();
269 let meta = SegmentMeta {
270 id: new_segment_id.0,
271 num_docs: total_docs,
272 field_stats: merged_field_stats,
273 };
274
275 dir.write(&files.meta, &meta.serialize()?).await?;
276
277 let label = match &dense_strategy {
278 DenseVectorStrategy::MergeExisting => "Merge",
279 DenseVectorStrategy::BuildAnn(_) => "ANN merge",
280 };
281 log::info!(
282 "{} complete: {} docs, {} terms, term_dict={}, postings={}, store={}, vectors={}, sparse={}",
283 label,
284 total_docs,
285 stats.terms_processed,
286 format_bytes(stats.term_dict_bytes),
287 format_bytes(stats.postings_bytes),
288 format_bytes(stats.store_bytes),
289 format_bytes(stats.vectors_bytes),
290 format_bytes(stats.sparse_bytes),
291 );
292
293 Ok((meta, stats))
294 }
295
296 async fn merge_postings(
308 &self,
309 segments: &[SegmentReader],
310 term_dict: &mut OffsetWriter,
311 postings_out: &mut OffsetWriter,
312 positions_out: &mut OffsetWriter,
313 stats: &mut MergeStats,
314 ) -> Result<usize> {
315 let doc_offs = doc_offsets(segments);
316
317 for (i, segment) in segments.iter().enumerate() {
319 log::debug!("Prefetching term dict for segment {} ...", i);
320 segment.prefetch_term_dict().await?;
321 }
322
323 let mut iterators: Vec<_> = segments.iter().map(|s| s.term_dict_iter()).collect();
325
326 let mut heap: BinaryHeap<MergeEntry> = BinaryHeap::new();
328 for (seg_idx, iter) in iterators.iter_mut().enumerate() {
329 if let Some((key, term_info)) = iter.next().await.map_err(crate::Error::from)? {
330 heap.push(MergeEntry {
331 key,
332 term_info,
333 segment_idx: seg_idx,
334 doc_offset: doc_offs[seg_idx],
335 });
336 }
337 }
338
339 let mut term_results: Vec<(Vec<u8>, TermInfo)> = Vec::new();
342 let mut terms_processed = 0usize;
343
344 while !heap.is_empty() {
345 let first = heap.pop().unwrap();
347 let current_key = first.key.clone();
348
349 let mut sources: Vec<(usize, TermInfo, u32)> =
351 vec![(first.segment_idx, first.term_info, first.doc_offset)];
352
353 if let Some((key, term_info)) = iterators[first.segment_idx]
355 .next()
356 .await
357 .map_err(crate::Error::from)?
358 {
359 heap.push(MergeEntry {
360 key,
361 term_info,
362 segment_idx: first.segment_idx,
363 doc_offset: doc_offs[first.segment_idx],
364 });
365 }
366
367 while let Some(entry) = heap.peek() {
369 if entry.key != current_key {
370 break;
371 }
372 let entry = heap.pop().unwrap();
373 sources.push((entry.segment_idx, entry.term_info, entry.doc_offset));
374
375 if let Some((key, term_info)) = iterators[entry.segment_idx]
377 .next()
378 .await
379 .map_err(crate::Error::from)?
380 {
381 heap.push(MergeEntry {
382 key,
383 term_info,
384 segment_idx: entry.segment_idx,
385 doc_offset: doc_offs[entry.segment_idx],
386 });
387 }
388 }
389
390 let term_info = self
392 .merge_term(segments, &sources, postings_out, positions_out)
393 .await?;
394
395 term_results.push((current_key, term_info));
396 terms_processed += 1;
397
398 if terms_processed.is_multiple_of(100_000) {
400 log::debug!("Merge progress: {} terms processed", terms_processed);
401 }
402 }
403
404 let results_mem = term_results.capacity() * std::mem::size_of::<(Vec<u8>, TermInfo)>();
406 stats.current_memory_bytes = results_mem;
407 stats.peak_memory_bytes = stats.peak_memory_bytes.max(stats.current_memory_bytes);
408
409 log::info!(
410 "[merge] complete: terms={}, segments={}, term_buffer={:.2} MB, postings={}, positions={}",
411 terms_processed,
412 segments.len(),
413 results_mem as f64 / (1024.0 * 1024.0),
414 format_bytes(postings_out.offset() as usize),
415 format_bytes(positions_out.offset() as usize),
416 );
417
418 let mut writer = SSTableWriter::<TermInfo>::new(term_dict);
420 for (key, term_info) in term_results {
421 writer.insert(&key, &term_info)?;
422 }
423 writer.finish()?;
424
425 Ok(terms_processed)
426 }
427
428 async fn merge_term(
434 &self,
435 segments: &[SegmentReader],
436 sources: &[(usize, TermInfo, u32)],
437 postings_out: &mut OffsetWriter,
438 positions_out: &mut OffsetWriter,
439 ) -> Result<TermInfo> {
440 let mut sorted: Vec<_> = sources.to_vec();
441 sorted.sort_by_key(|(_, _, off)| *off);
442
443 let any_positions = sorted.iter().any(|(_, ti, _)| ti.position_info().is_some());
444 let all_external = sorted.iter().all(|(_, ti, _)| ti.external_info().is_some());
445
446 let (posting_offset, posting_len, doc_count) = if all_external && sorted.len() > 1 {
448 let mut block_sources = Vec::with_capacity(sorted.len());
450 for (seg_idx, ti, doc_off) in &sorted {
451 let (off, len) = ti.external_info().unwrap();
452 let bytes = segments[*seg_idx].read_postings(off, len).await?;
453 let bpl = BlockPostingList::deserialize(&mut bytes.as_slice())?;
454 block_sources.push((bpl, *doc_off));
455 }
456 let merged = BlockPostingList::concatenate_blocks(&block_sources)?;
457 let offset = postings_out.offset();
458 let mut buf = Vec::new();
459 merged.serialize(&mut buf)?;
460 postings_out.write_all(&buf)?;
461 (offset, buf.len() as u32, merged.doc_count())
462 } else {
463 let mut merged = PostingList::new();
465 for (seg_idx, ti, doc_off) in &sorted {
466 if let Some((ids, tfs)) = ti.decode_inline() {
467 for (id, tf) in ids.into_iter().zip(tfs) {
468 merged.add(id + doc_off, tf);
469 }
470 } else {
471 let (off, len) = ti.external_info().unwrap();
472 let bytes = segments[*seg_idx].read_postings(off, len).await?;
473 let bpl = BlockPostingList::deserialize(&mut bytes.as_slice())?;
474 let mut it = bpl.iterator();
475 while it.doc() != TERMINATED {
476 merged.add(it.doc() + doc_off, it.term_freq());
477 it.advance();
478 }
479 }
480 }
481 if !any_positions {
483 let ids: Vec<u32> = merged.iter().map(|p| p.doc_id).collect();
484 let tfs: Vec<u32> = merged.iter().map(|p| p.term_freq).collect();
485 if let Some(inline) = TermInfo::try_inline(&ids, &tfs) {
486 return Ok(inline);
487 }
488 }
489 let offset = postings_out.offset();
490 let block = BlockPostingList::from_posting_list(&merged)?;
491 let mut buf = Vec::new();
492 block.serialize(&mut buf)?;
493 postings_out.write_all(&buf)?;
494 (offset, buf.len() as u32, merged.doc_count())
495 };
496
497 if any_positions {
499 let mut pos_sources = Vec::new();
500 for (seg_idx, ti, doc_off) in &sorted {
501 if let Some((pos_off, pos_len)) = ti.position_info()
502 && let Some(bytes) = segments[*seg_idx]
503 .read_position_bytes(pos_off, pos_len)
504 .await?
505 {
506 let pl = PositionPostingList::deserialize(&mut bytes.as_slice())
507 .map_err(crate::Error::Io)?;
508 pos_sources.push((pl, *doc_off));
509 }
510 }
511 if !pos_sources.is_empty() {
512 let merged = PositionPostingList::concatenate_blocks(&pos_sources)
513 .map_err(crate::Error::Io)?;
514 let offset = positions_out.offset();
515 let mut buf = Vec::new();
516 merged.serialize(&mut buf).map_err(crate::Error::Io)?;
517 positions_out.write_all(&buf)?;
518 return Ok(TermInfo::external_with_positions(
519 posting_offset,
520 posting_len,
521 doc_count,
522 offset,
523 buf.len() as u32,
524 ));
525 }
526 }
527
528 Ok(TermInfo::external(posting_offset, posting_len, doc_count))
529 }
530
531 async fn merge_dense_vectors<D: Directory + DirectoryWriter>(
537 &self,
538 dir: &D,
539 segments: &[SegmentReader],
540 files: &SegmentFiles,
541 ) -> Result<usize> {
542 let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
543
544 for (field, entry) in self.schema.fields() {
545 if !matches!(entry.field_type, FieldType::DenseVector) {
546 continue;
547 }
548
549 let scann_indexes: Vec<_> = segments
551 .iter()
552 .filter_map(|s| s.get_scann_vector_index(field))
553 .collect();
554
555 if scann_indexes.len()
556 == segments
557 .iter()
558 .filter(|s| s.has_dense_vector_index(field))
559 .count()
560 && !scann_indexes.is_empty()
561 {
562 let refs: Vec<&crate::structures::IVFPQIndex> =
564 scann_indexes.iter().map(|(idx, _)| idx.as_ref()).collect();
565
566 let doc_offs = doc_offsets(segments);
567
568 match crate::structures::IVFPQIndex::merge(&refs, &doc_offs) {
569 Ok(merged) => {
570 let bytes = merged
571 .to_bytes()
572 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
573 field_indexes.push((field.0, 2u8, bytes)); continue;
575 }
576 Err(e) => {
577 log::warn!("ScaNN merge failed: {}, falling back to IVF", e);
578 }
579 }
580 }
581
582 let ivf_indexes: Vec<_> = segments
584 .iter()
585 .filter_map(|s| s.get_ivf_vector_index(field))
586 .collect();
587
588 if ivf_indexes.len()
589 == segments
590 .iter()
591 .filter(|s| s.has_dense_vector_index(field))
592 .count()
593 && !ivf_indexes.is_empty()
594 {
595 let refs: Vec<&crate::structures::IVFRaBitQIndex> =
597 ivf_indexes.iter().map(|arc| arc.as_ref()).collect();
598
599 let doc_offs = doc_offsets(segments);
600
601 match crate::structures::IVFRaBitQIndex::merge(&refs, &doc_offs) {
602 Ok(merged) => {
603 let bytes = merged
604 .to_bytes()
605 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
606 field_indexes.push((field.0, 1u8, bytes)); continue;
608 }
609 Err(e) => {
610 log::warn!("IVF merge failed: {}, falling back to rebuild", e);
611 }
612 }
613 }
614
615 let mut all_vectors: Vec<Vec<f32>> = Vec::new();
617
618 for segment in segments {
619 if let Some(index) = segment.get_dense_vector_index(field)
620 && let Some(raw_vecs) = &index.raw_vectors
621 {
622 all_vectors.extend(raw_vecs.iter().cloned());
623 }
624 }
625
626 if !all_vectors.is_empty() {
627 let dim = all_vectors[0].len();
628 let config = RaBitQConfig::new(dim);
629 let merged_index = RaBitQIndex::build(config, &all_vectors, true);
630
631 let index_bytes = serde_json::to_vec(&merged_index)
632 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
633
634 field_indexes.push((field.0, 0u8, index_bytes)); }
636 }
637
638 write_vector_file(dir, files, field_indexes).await
639 }
640
641 async fn merge_sparse_vectors<D: Directory + DirectoryWriter>(
647 &self,
648 dir: &D,
649 segments: &[SegmentReader],
650 files: &SegmentFiles,
651 ) -> Result<usize> {
652 use crate::structures::BlockSparsePostingList;
653 use byteorder::{LittleEndian, WriteBytesExt};
654
655 let doc_offs = doc_offsets(segments);
656 for (i, seg) in segments.iter().enumerate() {
657 log::debug!(
658 "Sparse merge: segment {} has {} docs, doc_offset={}",
659 i,
660 seg.num_docs(),
661 doc_offs[i]
662 );
663 }
664
665 let sparse_fields: Vec<_> = self
667 .schema
668 .fields()
669 .filter(|(_, entry)| matches!(entry.field_type, FieldType::SparseVector))
670 .map(|(field, entry)| (field, entry.sparse_vector_config.clone()))
671 .collect();
672
673 if sparse_fields.is_empty() {
674 return Ok(0);
675 }
676
677 type SparseFieldData = (
679 u32,
680 crate::structures::WeightQuantization,
681 u32,
682 FxHashMap<u32, Vec<u8>>,
683 );
684 let mut field_data: Vec<SparseFieldData> = Vec::new();
685
686 for (field, sparse_config) in &sparse_fields {
687 let quantization = sparse_config
689 .as_ref()
690 .map(|c| c.weight_quantization)
691 .unwrap_or(crate::structures::WeightQuantization::Float32);
692
693 let mut all_dims: rustc_hash::FxHashSet<u32> = rustc_hash::FxHashSet::default();
695 for segment in segments {
696 if let Some(sparse_index) = segment.sparse_indexes().get(&field.0) {
697 for dim_id in sparse_index.active_dimensions() {
698 all_dims.insert(dim_id);
699 }
700 }
701 }
702
703 if all_dims.is_empty() {
704 continue;
705 }
706
707 let mut segment_postings: Vec<FxHashMap<u32, Arc<BlockSparsePostingList>>> =
710 Vec::with_capacity(segments.len());
711 for (seg_idx, segment) in segments.iter().enumerate() {
712 if let Some(sparse_index) = segment.sparse_indexes().get(&field.0) {
713 log::debug!(
714 "Sparse merge field {}: bulk-reading {} dims from segment {}",
715 field.0,
716 sparse_index.num_dimensions(),
717 seg_idx
718 );
719 let postings = sparse_index.read_all_postings_bulk().await?;
720 segment_postings.push(postings);
721 } else {
722 segment_postings.push(FxHashMap::default());
723 }
724 }
725
726 let mut dim_bytes: FxHashMap<u32, Vec<u8>> = FxHashMap::default();
727
728 for dim_id in all_dims {
730 let mut posting_arcs: Vec<(Arc<BlockSparsePostingList>, u32)> = Vec::new();
731
732 for (seg_idx, postings) in segment_postings.iter().enumerate() {
733 if let Some(posting_list) = postings.get(&dim_id) {
734 posting_arcs.push((Arc::clone(posting_list), doc_offs[seg_idx]));
735 }
736 }
737
738 if posting_arcs.is_empty() {
739 continue;
740 }
741
742 let lists_with_offsets: Vec<(&BlockSparsePostingList, u32)> = posting_arcs
743 .iter()
744 .map(|(pl, offset)| (pl.as_ref(), *offset))
745 .collect();
746
747 let merged = BlockSparsePostingList::merge_with_offsets(&lists_with_offsets);
748
749 let mut bytes = Vec::new();
750 merged.serialize(&mut bytes).map_err(crate::Error::Io)?;
751 dim_bytes.insert(dim_id, bytes);
752 }
753
754 drop(segment_postings);
756
757 field_data.push((field.0, quantization, dim_bytes.len() as u32, dim_bytes));
759 }
760
761 if field_data.is_empty() {
762 return Ok(0);
763 }
764
765 field_data.sort_by_key(|(id, _, _, _)| *id);
767
768 let per_dim_entry = size_of::<u32>() + size_of::<u64>() + size_of::<u32>();
772 let per_field_header = size_of::<u32>() + size_of::<u8>() + size_of::<u32>();
773 let mut header_size = size_of::<u32>() as u64;
774 for (_, _, num_dims, _) in &field_data {
775 header_size += per_field_header as u64;
776 header_size += (*num_dims as u64) * per_dim_entry as u64;
777 }
778
779 let mut current_offset = header_size;
781 let mut field_tables: Vec<Vec<(u32, u64, u32)>> = Vec::new();
782 for (_, _, _, dim_bytes) in &field_data {
783 let mut table: Vec<(u32, u64, u32)> = Vec::with_capacity(dim_bytes.len());
784 let mut dims: Vec<_> = dim_bytes.keys().copied().collect();
785 dims.sort();
786 for dim_id in dims {
787 let bytes = &dim_bytes[&dim_id];
788 table.push((dim_id, current_offset, bytes.len() as u32));
789 current_offset += bytes.len() as u64;
790 }
791 field_tables.push(table);
792 }
793
794 let mut writer = OffsetWriter::new(dir.streaming_writer(&files.sparse).await?);
796
797 writer.write_u32::<LittleEndian>(field_data.len() as u32)?;
798 for (i, (field_id, quantization, num_dims, _)) in field_data.iter().enumerate() {
799 writer.write_u32::<LittleEndian>(*field_id)?;
800 writer.write_u8(*quantization as u8)?;
801 writer.write_u32::<LittleEndian>(*num_dims)?;
802 for &(dim_id, offset, length) in &field_tables[i] {
803 writer.write_u32::<LittleEndian>(dim_id)?;
804 writer.write_u64::<LittleEndian>(offset)?;
805 writer.write_u32::<LittleEndian>(length)?;
806 }
807 }
808
809 for (_, _, _, dim_bytes) in field_data {
811 let mut dims: Vec<_> = dim_bytes.keys().copied().collect();
812 dims.sort();
813 for dim_id in dims {
814 writer.write_all(&dim_bytes[&dim_id])?;
815 }
816 }
817
818 let output_size = writer.offset() as usize;
819 writer.finish()?;
820
821 log::info!(
822 "Sparse vector merge complete: {} fields, {} bytes",
823 field_tables.len(),
824 output_size
825 );
826
827 Ok(output_size)
828 }
829
830 pub async fn merge_with_ann<D: Directory + DirectoryWriter>(
832 &self,
833 dir: &D,
834 segments: &[SegmentReader],
835 new_segment_id: SegmentId,
836 trained: &TrainedVectorStructures,
837 ) -> Result<(SegmentMeta, MergeStats)> {
838 self.merge_core(
839 dir,
840 segments,
841 new_segment_id,
842 DenseVectorStrategy::BuildAnn(trained),
843 )
844 .await
845 }
846
847 async fn build_ann_vectors<D: Directory + DirectoryWriter>(
849 &self,
850 dir: &D,
851 segments: &[SegmentReader],
852 files: &SegmentFiles,
853 trained: &TrainedVectorStructures,
854 ) -> Result<usize> {
855 use crate::dsl::VectorIndexType;
856
857 let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
858
859 for (field, entry) in self.schema.fields() {
860 if !matches!(entry.field_type, FieldType::DenseVector) || !entry.indexed {
861 continue;
862 }
863
864 let config = match &entry.dense_vector_config {
865 Some(c) => c,
866 None => continue,
867 };
868
869 let mut all_vectors: Vec<Vec<f32>> = Vec::new();
871 let mut all_doc_ids: Vec<(u32, u16)> = Vec::new();
872 let mut doc_offset = 0u32;
873
874 for segment in segments {
875 if let Some(super::VectorIndex::Flat(flat_data)) =
876 segment.vector_indexes().get(&field.0)
877 {
878 for (vec, (local_doc_id, ordinal)) in
879 flat_data.vectors.iter().zip(flat_data.doc_ids.iter())
880 {
881 all_vectors.push(vec.clone());
882 all_doc_ids.push((doc_offset + local_doc_id, *ordinal));
883 }
884 }
885 doc_offset += segment.num_docs();
886 }
887
888 if all_vectors.is_empty() {
889 continue;
890 }
891
892 let dim = config.index_dim();
893
894 let ann_doc_ids: Vec<u32> = all_doc_ids.iter().map(|(doc_id, _)| *doc_id).collect();
896
897 match config.index_type {
899 VectorIndexType::IvfRaBitQ => {
900 if let Some(centroids) = trained.centroids.get(&field.0) {
901 let rabitq_config = crate::structures::RaBitQConfig::new(dim);
903 let codebook = crate::structures::RaBitQCodebook::new(rabitq_config);
904
905 let ivf_config = crate::structures::IVFRaBitQConfig::new(dim)
907 .with_store_raw(config.store_raw);
908 let ivf_index = crate::structures::IVFRaBitQIndex::build(
909 ivf_config,
910 centroids,
911 &codebook,
912 &all_vectors,
913 Some(&ann_doc_ids),
914 );
915
916 let index_data = super::builder::IVFRaBitQIndexData {
917 centroids: (**centroids).clone(),
918 codebook,
919 index: ivf_index,
920 };
921 let bytes = index_data
922 .to_bytes()
923 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
924 field_indexes.push((field.0, 1u8, bytes)); log::info!(
927 "Built IVF-RaBitQ index for field {} with {} vectors",
928 field.0,
929 all_vectors.len()
930 );
931 continue;
932 }
933 }
934 VectorIndexType::ScaNN => {
935 if let (Some(centroids), Some(codebook)) = (
936 trained.centroids.get(&field.0),
937 trained.codebooks.get(&field.0),
938 ) {
939 let ivf_pq_config = crate::structures::IVFPQConfig::new(dim);
941 let ivf_pq_index = crate::structures::IVFPQIndex::build(
942 ivf_pq_config,
943 centroids,
944 codebook,
945 &all_vectors,
946 Some(&ann_doc_ids),
947 );
948
949 let index_data = super::builder::ScaNNIndexData {
950 centroids: (**centroids).clone(),
951 codebook: (**codebook).clone(),
952 index: ivf_pq_index,
953 };
954 let bytes = index_data
955 .to_bytes()
956 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
957 field_indexes.push((field.0, 2u8, bytes)); log::info!(
960 "Built ScaNN index for field {} with {} vectors",
961 field.0,
962 all_vectors.len()
963 );
964 continue;
965 }
966 }
967 _ => {}
968 }
969
970 let flat_data = super::builder::FlatVectorData {
972 dim,
973 vectors: all_vectors,
974 doc_ids: all_doc_ids,
975 };
976 let bytes = flat_data.to_binary_bytes();
977 field_indexes.push((field.0, 4u8, bytes)); }
979
980 write_vector_file(dir, files, field_indexes).await
981 }
982}
983
984async fn write_vector_file<D: Directory + DirectoryWriter>(
988 dir: &D,
989 files: &SegmentFiles,
990 mut field_indexes: Vec<(u32, u8, Vec<u8>)>,
991) -> Result<usize> {
992 use byteorder::{LittleEndian, WriteBytesExt};
993
994 if field_indexes.is_empty() {
995 return Ok(0);
996 }
997
998 field_indexes.sort_by_key(|(id, _, _)| *id);
999
1000 let mut writer = OffsetWriter::new(dir.streaming_writer(&files.vectors).await?);
1001
1002 let per_field_entry = size_of::<u32>() + size_of::<u8>() + size_of::<u64>() + size_of::<u64>();
1004 let header_size = size_of::<u32>() + field_indexes.len() * per_field_entry;
1005 writer.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
1006
1007 let mut current_offset = header_size as u64;
1008 for (field_id, index_type, data) in &field_indexes {
1009 writer.write_u32::<LittleEndian>(*field_id)?;
1010 writer.write_u8(*index_type)?;
1011 writer.write_u64::<LittleEndian>(current_offset)?;
1012 writer.write_u64::<LittleEndian>(data.len() as u64)?;
1013 current_offset += data.len() as u64;
1014 }
1015
1016 for (_, _, data) in field_indexes {
1018 writer.write_all(&data)?;
1019 }
1020
1021 let output_size = writer.offset() as usize;
1022 writer.finish()?;
1023 Ok(output_size)
1024}
1025
1026pub async fn delete_segment<D: Directory + DirectoryWriter>(
1028 dir: &D,
1029 segment_id: SegmentId,
1030) -> Result<()> {
1031 let files = SegmentFiles::new(segment_id.0);
1032 let _ = dir.delete(&files.term_dict).await;
1033 let _ = dir.delete(&files.postings).await;
1034 let _ = dir.delete(&files.store).await;
1035 let _ = dir.delete(&files.meta).await;
1036 let _ = dir.delete(&files.vectors).await;
1037 Ok(())
1038}