1use std::cmp::Ordering;
4use std::collections::BinaryHeap;
5use std::io::Write;
6use std::sync::Arc;
7
8use rustc_hash::FxHashMap;
9
10use super::reader::SegmentReader;
11use super::store::StoreMerger;
12use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
13use crate::Result;
14use crate::directories::{Directory, DirectoryWriter, StreamingWriter};
15use crate::dsl::{FieldType, Schema};
16use crate::structures::{
17 BlockPostingList, PositionPostingList, PostingList, RaBitQConfig, RaBitQIndex, SSTableWriter,
18 TERMINATED, TermInfo,
19};
20
21pub(crate) struct OffsetWriter {
26 inner: Box<dyn StreamingWriter>,
27 offset: u64,
28}
29
30impl OffsetWriter {
31 fn new(inner: Box<dyn StreamingWriter>) -> Self {
32 Self { inner, offset: 0 }
33 }
34
35 fn offset(&self) -> u64 {
37 self.offset
38 }
39
40 fn finish(self) -> std::io::Result<()> {
42 self.inner.finish()
43 }
44}
45
46impl Write for OffsetWriter {
47 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
48 let n = self.inner.write(buf)?;
49 self.offset += n as u64;
50 Ok(n)
51 }
52
53 fn flush(&mut self) -> std::io::Result<()> {
54 self.inner.flush()
55 }
56}
57
58fn format_bytes(bytes: usize) -> String {
60 if bytes >= 1024 * 1024 * 1024 {
61 format!("{:.2} GB", bytes as f64 / (1024.0 * 1024.0 * 1024.0))
62 } else if bytes >= 1024 * 1024 {
63 format!("{:.2} MB", bytes as f64 / (1024.0 * 1024.0))
64 } else if bytes >= 1024 {
65 format!("{:.2} KB", bytes as f64 / 1024.0)
66 } else {
67 format!("{} B", bytes)
68 }
69}
70
71fn doc_offsets(segments: &[SegmentReader]) -> Vec<u32> {
73 let mut offsets = Vec::with_capacity(segments.len());
74 let mut acc = 0u32;
75 for seg in segments {
76 offsets.push(acc);
77 acc += seg.num_docs();
78 }
79 offsets
80}
81
82#[derive(Debug, Clone, Default)]
84pub struct MergeStats {
85 pub terms_processed: usize,
87 pub peak_memory_bytes: usize,
89 pub current_memory_bytes: usize,
91 pub term_dict_bytes: usize,
93 pub postings_bytes: usize,
95 pub store_bytes: usize,
97 pub vectors_bytes: usize,
99 pub sparse_bytes: usize,
101}
102
103struct MergeEntry {
105 key: Vec<u8>,
106 term_info: TermInfo,
107 segment_idx: usize,
108 doc_offset: u32,
109}
110
111impl PartialEq for MergeEntry {
112 fn eq(&self, other: &Self) -> bool {
113 self.key == other.key
114 }
115}
116
117impl Eq for MergeEntry {}
118
119impl PartialOrd for MergeEntry {
120 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
121 Some(self.cmp(other))
122 }
123}
124
125impl Ord for MergeEntry {
126 fn cmp(&self, other: &Self) -> Ordering {
127 other.key.cmp(&self.key)
129 }
130}
131
132pub struct TrainedVectorStructures {
134 pub centroids: rustc_hash::FxHashMap<u32, Arc<crate::structures::CoarseCentroids>>,
136 pub codebooks: rustc_hash::FxHashMap<u32, Arc<crate::structures::PQCodebook>>,
138}
139
140pub enum DenseVectorStrategy<'a> {
142 MergeExisting,
144 BuildAnn(&'a TrainedVectorStructures),
146}
147
148pub struct SegmentMerger {
150 schema: Arc<Schema>,
151}
152
153impl SegmentMerger {
154 pub fn new(schema: Arc<Schema>) -> Self {
155 Self { schema }
156 }
157
158 pub async fn merge<D: Directory + DirectoryWriter>(
160 &self,
161 dir: &D,
162 segments: &[SegmentReader],
163 new_segment_id: SegmentId,
164 ) -> Result<(SegmentMeta, MergeStats)> {
165 self.merge_core(
166 dir,
167 segments,
168 new_segment_id,
169 DenseVectorStrategy::MergeExisting,
170 )
171 .await
172 }
173
174 async fn merge_core<D: Directory + DirectoryWriter>(
181 &self,
182 dir: &D,
183 segments: &[SegmentReader],
184 new_segment_id: SegmentId,
185 dense_strategy: DenseVectorStrategy<'_>,
186 ) -> Result<(SegmentMeta, MergeStats)> {
187 let mut stats = MergeStats::default();
188 let files = SegmentFiles::new(new_segment_id.0);
189
190 let mut postings_writer = OffsetWriter::new(dir.streaming_writer(&files.postings).await?);
192 let mut positions_writer = OffsetWriter::new(dir.streaming_writer(&files.positions).await?);
193 let mut term_dict_writer = OffsetWriter::new(dir.streaming_writer(&files.term_dict).await?);
194
195 let terms_processed = self
196 .merge_postings(
197 segments,
198 &mut term_dict_writer,
199 &mut postings_writer,
200 &mut positions_writer,
201 &mut stats,
202 )
203 .await?;
204 stats.terms_processed = terms_processed;
205 stats.postings_bytes = postings_writer.offset() as usize;
206 stats.term_dict_bytes = term_dict_writer.offset() as usize;
207 let positions_bytes = positions_writer.offset();
208
209 postings_writer.finish()?;
210 term_dict_writer.finish()?;
211 if positions_bytes > 0 {
212 positions_writer.finish()?;
213 } else {
214 drop(positions_writer);
215 let _ = dir.delete(&files.positions).await;
216 }
217
218 {
220 let mut store_writer = OffsetWriter::new(dir.streaming_writer(&files.store).await?);
221 {
222 let mut store_merger = StoreMerger::new(&mut store_writer);
223 for segment in segments {
224 if segment.store_has_dict() {
225 store_merger
226 .append_store_recompressing(segment.store())
227 .await
228 .map_err(crate::Error::Io)?;
229 } else {
230 let raw_blocks = segment.store_raw_blocks();
231 let data_slice = segment.store_data_slice();
232 store_merger.append_store(data_slice, &raw_blocks).await?;
233 }
234 }
235 store_merger.finish()?;
236 }
237 stats.store_bytes = store_writer.offset() as usize;
238 store_writer.finish()?;
239 }
240
241 let vectors_bytes = match &dense_strategy {
243 DenseVectorStrategy::MergeExisting => {
244 self.merge_dense_vectors(dir, segments, &files).await?
245 }
246 DenseVectorStrategy::BuildAnn(trained) => {
247 self.build_ann_vectors(dir, segments, &files, trained)
248 .await?
249 }
250 };
251 stats.vectors_bytes = vectors_bytes;
252
253 let sparse_bytes = self.merge_sparse_vectors(dir, segments, &files).await?;
255 stats.sparse_bytes = sparse_bytes;
256
257 let mut merged_field_stats: FxHashMap<u32, FieldStats> = FxHashMap::default();
259 for segment in segments {
260 for (&field_id, field_stats) in &segment.meta().field_stats {
261 let entry = merged_field_stats.entry(field_id).or_default();
262 entry.total_tokens += field_stats.total_tokens;
263 entry.doc_count += field_stats.doc_count;
264 }
265 }
266
267 let total_docs: u32 = segments.iter().map(|s| s.num_docs()).sum();
268 let meta = SegmentMeta {
269 id: new_segment_id.0,
270 num_docs: total_docs,
271 field_stats: merged_field_stats,
272 };
273
274 dir.write(&files.meta, &meta.serialize()?).await?;
275
276 let label = match &dense_strategy {
277 DenseVectorStrategy::MergeExisting => "Merge",
278 DenseVectorStrategy::BuildAnn(_) => "ANN merge",
279 };
280 log::info!(
281 "{} complete: {} docs, {} terms, term_dict={}, postings={}, store={}, vectors={}, sparse={}",
282 label,
283 total_docs,
284 stats.terms_processed,
285 format_bytes(stats.term_dict_bytes),
286 format_bytes(stats.postings_bytes),
287 format_bytes(stats.store_bytes),
288 format_bytes(stats.vectors_bytes),
289 format_bytes(stats.sparse_bytes),
290 );
291
292 Ok((meta, stats))
293 }
294
295 async fn merge_postings(
307 &self,
308 segments: &[SegmentReader],
309 term_dict: &mut OffsetWriter,
310 postings_out: &mut OffsetWriter,
311 positions_out: &mut OffsetWriter,
312 stats: &mut MergeStats,
313 ) -> Result<usize> {
314 let doc_offs = doc_offsets(segments);
315
316 for (i, segment) in segments.iter().enumerate() {
318 log::debug!("Prefetching term dict for segment {} ...", i);
319 segment.prefetch_term_dict().await?;
320 }
321
322 let mut iterators: Vec<_> = segments.iter().map(|s| s.term_dict_iter()).collect();
324
325 let mut heap: BinaryHeap<MergeEntry> = BinaryHeap::new();
327 for (seg_idx, iter) in iterators.iter_mut().enumerate() {
328 if let Some((key, term_info)) = iter.next().await.map_err(crate::Error::from)? {
329 heap.push(MergeEntry {
330 key,
331 term_info,
332 segment_idx: seg_idx,
333 doc_offset: doc_offs[seg_idx],
334 });
335 }
336 }
337
338 let mut term_results: Vec<(Vec<u8>, TermInfo)> = Vec::new();
341 let mut terms_processed = 0usize;
342
343 while !heap.is_empty() {
344 let first = heap.pop().unwrap();
346 let current_key = first.key.clone();
347
348 let mut sources: Vec<(usize, TermInfo, u32)> =
350 vec![(first.segment_idx, first.term_info, first.doc_offset)];
351
352 if let Some((key, term_info)) = iterators[first.segment_idx]
354 .next()
355 .await
356 .map_err(crate::Error::from)?
357 {
358 heap.push(MergeEntry {
359 key,
360 term_info,
361 segment_idx: first.segment_idx,
362 doc_offset: doc_offs[first.segment_idx],
363 });
364 }
365
366 while let Some(entry) = heap.peek() {
368 if entry.key != current_key {
369 break;
370 }
371 let entry = heap.pop().unwrap();
372 sources.push((entry.segment_idx, entry.term_info, entry.doc_offset));
373
374 if let Some((key, term_info)) = iterators[entry.segment_idx]
376 .next()
377 .await
378 .map_err(crate::Error::from)?
379 {
380 heap.push(MergeEntry {
381 key,
382 term_info,
383 segment_idx: entry.segment_idx,
384 doc_offset: doc_offs[entry.segment_idx],
385 });
386 }
387 }
388
389 let term_info = self
391 .merge_term(segments, &sources, postings_out, positions_out)
392 .await?;
393
394 term_results.push((current_key, term_info));
395 terms_processed += 1;
396
397 if terms_processed.is_multiple_of(100_000) {
399 log::debug!("Merge progress: {} terms processed", terms_processed);
400 }
401 }
402
403 let results_mem = term_results.capacity() * std::mem::size_of::<(Vec<u8>, TermInfo)>();
405 stats.current_memory_bytes = results_mem;
406 stats.peak_memory_bytes = stats.peak_memory_bytes.max(stats.current_memory_bytes);
407
408 log::info!(
409 "[merge] complete: terms={}, segments={}, term_buffer={:.2} MB, postings={}, positions={}",
410 terms_processed,
411 segments.len(),
412 results_mem as f64 / (1024.0 * 1024.0),
413 format_bytes(postings_out.offset() as usize),
414 format_bytes(positions_out.offset() as usize),
415 );
416
417 let mut writer = SSTableWriter::<TermInfo>::new(term_dict);
419 for (key, term_info) in term_results {
420 writer.insert(&key, &term_info)?;
421 }
422 writer.finish()?;
423
424 Ok(terms_processed)
425 }
426
427 async fn merge_term(
433 &self,
434 segments: &[SegmentReader],
435 sources: &[(usize, TermInfo, u32)],
436 postings_out: &mut OffsetWriter,
437 positions_out: &mut OffsetWriter,
438 ) -> Result<TermInfo> {
439 let mut sorted: Vec<_> = sources.to_vec();
440 sorted.sort_by_key(|(_, _, off)| *off);
441
442 let any_positions = sorted.iter().any(|(_, ti, _)| ti.position_info().is_some());
443 let all_external = sorted.iter().all(|(_, ti, _)| ti.external_info().is_some());
444
445 let (posting_offset, posting_len, doc_count) = if all_external && sorted.len() > 1 {
447 let mut block_sources = Vec::with_capacity(sorted.len());
449 for (seg_idx, ti, doc_off) in &sorted {
450 let (off, len) = ti.external_info().unwrap();
451 let bytes = segments[*seg_idx].read_postings(off, len).await?;
452 let bpl = BlockPostingList::deserialize(&mut bytes.as_slice())?;
453 block_sources.push((bpl, *doc_off));
454 }
455 let merged = BlockPostingList::concatenate_blocks(&block_sources)?;
456 let offset = postings_out.offset();
457 let mut buf = Vec::new();
458 merged.serialize(&mut buf)?;
459 postings_out.write_all(&buf)?;
460 (offset, buf.len() as u32, merged.doc_count())
461 } else {
462 let mut merged = PostingList::new();
464 for (seg_idx, ti, doc_off) in &sorted {
465 if let Some((ids, tfs)) = ti.decode_inline() {
466 for (id, tf) in ids.into_iter().zip(tfs) {
467 merged.add(id + doc_off, tf);
468 }
469 } else {
470 let (off, len) = ti.external_info().unwrap();
471 let bytes = segments[*seg_idx].read_postings(off, len).await?;
472 let bpl = BlockPostingList::deserialize(&mut bytes.as_slice())?;
473 let mut it = bpl.iterator();
474 while it.doc() != TERMINATED {
475 merged.add(it.doc() + doc_off, it.term_freq());
476 it.advance();
477 }
478 }
479 }
480 if !any_positions {
482 let ids: Vec<u32> = merged.iter().map(|p| p.doc_id).collect();
483 let tfs: Vec<u32> = merged.iter().map(|p| p.term_freq).collect();
484 if let Some(inline) = TermInfo::try_inline(&ids, &tfs) {
485 return Ok(inline);
486 }
487 }
488 let offset = postings_out.offset();
489 let block = BlockPostingList::from_posting_list(&merged)?;
490 let mut buf = Vec::new();
491 block.serialize(&mut buf)?;
492 postings_out.write_all(&buf)?;
493 (offset, buf.len() as u32, merged.doc_count())
494 };
495
496 if any_positions {
498 let mut pos_sources = Vec::new();
499 for (seg_idx, ti, doc_off) in &sorted {
500 if let Some((pos_off, pos_len)) = ti.position_info()
501 && let Some(bytes) = segments[*seg_idx]
502 .read_position_bytes(pos_off, pos_len)
503 .await?
504 {
505 let pl = PositionPostingList::deserialize(&mut bytes.as_slice())
506 .map_err(crate::Error::Io)?;
507 pos_sources.push((pl, *doc_off));
508 }
509 }
510 if !pos_sources.is_empty() {
511 let merged = PositionPostingList::concatenate_blocks(&pos_sources)
512 .map_err(crate::Error::Io)?;
513 let offset = positions_out.offset();
514 let mut buf = Vec::new();
515 merged.serialize(&mut buf).map_err(crate::Error::Io)?;
516 positions_out.write_all(&buf)?;
517 return Ok(TermInfo::external_with_positions(
518 posting_offset,
519 posting_len,
520 doc_count,
521 offset,
522 buf.len() as u32,
523 ));
524 }
525 }
526
527 Ok(TermInfo::external(posting_offset, posting_len, doc_count))
528 }
529
530 async fn merge_dense_vectors<D: Directory + DirectoryWriter>(
536 &self,
537 dir: &D,
538 segments: &[SegmentReader],
539 files: &SegmentFiles,
540 ) -> Result<usize> {
541 let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
542
543 for (field, entry) in self.schema.fields() {
544 if !matches!(entry.field_type, FieldType::DenseVector) {
545 continue;
546 }
547
548 let scann_indexes: Vec<_> = segments
550 .iter()
551 .filter_map(|s| s.get_scann_vector_index(field))
552 .collect();
553
554 if scann_indexes.len()
555 == segments
556 .iter()
557 .filter(|s| s.has_dense_vector_index(field))
558 .count()
559 && !scann_indexes.is_empty()
560 {
561 let refs: Vec<&crate::structures::IVFPQIndex> =
563 scann_indexes.iter().map(|(idx, _)| idx.as_ref()).collect();
564
565 let doc_offs = doc_offsets(segments);
566
567 match crate::structures::IVFPQIndex::merge(&refs, &doc_offs) {
568 Ok(merged) => {
569 let bytes = merged
570 .to_bytes()
571 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
572 field_indexes.push((field.0, 2u8, bytes)); continue;
574 }
575 Err(e) => {
576 log::warn!("ScaNN merge failed: {}, falling back to IVF", e);
577 }
578 }
579 }
580
581 let ivf_indexes: Vec<_> = segments
583 .iter()
584 .filter_map(|s| s.get_ivf_vector_index(field))
585 .collect();
586
587 if ivf_indexes.len()
588 == segments
589 .iter()
590 .filter(|s| s.has_dense_vector_index(field))
591 .count()
592 && !ivf_indexes.is_empty()
593 {
594 let refs: Vec<&crate::structures::IVFRaBitQIndex> =
596 ivf_indexes.iter().map(|arc| arc.as_ref()).collect();
597
598 let doc_offs = doc_offsets(segments);
599
600 match crate::structures::IVFRaBitQIndex::merge(&refs, &doc_offs) {
601 Ok(merged) => {
602 let bytes = merged
603 .to_bytes()
604 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
605 field_indexes.push((field.0, 1u8, bytes)); continue;
607 }
608 Err(e) => {
609 log::warn!("IVF merge failed: {}, falling back to rebuild", e);
610 }
611 }
612 }
613
614 let mut all_vectors: Vec<Vec<f32>> = Vec::new();
616
617 for segment in segments {
618 if let Some(index) = segment.get_dense_vector_index(field)
619 && let Some(raw_vecs) = &index.raw_vectors
620 {
621 all_vectors.extend(raw_vecs.iter().cloned());
622 }
623 }
624
625 if !all_vectors.is_empty() {
626 let dim = all_vectors[0].len();
627 let config = RaBitQConfig::new(dim);
628 let merged_index = RaBitQIndex::build(config, &all_vectors, true);
629
630 let index_bytes = serde_json::to_vec(&merged_index)
631 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
632
633 field_indexes.push((field.0, 0u8, index_bytes)); }
635 }
636
637 write_vector_file(dir, files, field_indexes).await
638 }
639
640 async fn merge_sparse_vectors<D: Directory + DirectoryWriter>(
646 &self,
647 dir: &D,
648 segments: &[SegmentReader],
649 files: &SegmentFiles,
650 ) -> Result<usize> {
651 use crate::structures::BlockSparsePostingList;
652 use byteorder::{LittleEndian, WriteBytesExt};
653
654 let doc_offs = doc_offsets(segments);
655 for (i, seg) in segments.iter().enumerate() {
656 log::debug!(
657 "Sparse merge: segment {} has {} docs, doc_offset={}",
658 i,
659 seg.num_docs(),
660 doc_offs[i]
661 );
662 }
663
664 let sparse_fields: Vec<_> = self
666 .schema
667 .fields()
668 .filter(|(_, entry)| matches!(entry.field_type, FieldType::SparseVector))
669 .map(|(field, entry)| (field, entry.sparse_vector_config.clone()))
670 .collect();
671
672 if sparse_fields.is_empty() {
673 return Ok(0);
674 }
675
676 type SparseFieldData = (
678 u32,
679 crate::structures::WeightQuantization,
680 u32,
681 FxHashMap<u32, Vec<u8>>,
682 );
683 let mut field_data: Vec<SparseFieldData> = Vec::new();
684
685 for (field, sparse_config) in &sparse_fields {
686 let quantization = sparse_config
688 .as_ref()
689 .map(|c| c.weight_quantization)
690 .unwrap_or(crate::structures::WeightQuantization::Float32);
691
692 let mut all_dims: rustc_hash::FxHashSet<u32> = rustc_hash::FxHashSet::default();
694 for segment in segments {
695 if let Some(sparse_index) = segment.sparse_indexes().get(&field.0) {
696 for dim_id in sparse_index.active_dimensions() {
697 all_dims.insert(dim_id);
698 }
699 }
700 }
701
702 if all_dims.is_empty() {
703 continue;
704 }
705
706 let mut segment_postings: Vec<FxHashMap<u32, Arc<BlockSparsePostingList>>> =
709 Vec::with_capacity(segments.len());
710 for (seg_idx, segment) in segments.iter().enumerate() {
711 if let Some(sparse_index) = segment.sparse_indexes().get(&field.0) {
712 log::debug!(
713 "Sparse merge field {}: bulk-reading {} dims from segment {}",
714 field.0,
715 sparse_index.num_dimensions(),
716 seg_idx
717 );
718 let postings = sparse_index.read_all_postings_bulk().await?;
719 segment_postings.push(postings);
720 } else {
721 segment_postings.push(FxHashMap::default());
722 }
723 }
724
725 let mut dim_bytes: FxHashMap<u32, Vec<u8>> = FxHashMap::default();
726
727 for dim_id in all_dims {
729 let mut posting_arcs: Vec<(Arc<BlockSparsePostingList>, u32)> = Vec::new();
730
731 for (seg_idx, postings) in segment_postings.iter().enumerate() {
732 if let Some(posting_list) = postings.get(&dim_id) {
733 posting_arcs.push((Arc::clone(posting_list), doc_offs[seg_idx]));
734 }
735 }
736
737 if posting_arcs.is_empty() {
738 continue;
739 }
740
741 let lists_with_offsets: Vec<(&BlockSparsePostingList, u32)> = posting_arcs
742 .iter()
743 .map(|(pl, offset)| (pl.as_ref(), *offset))
744 .collect();
745
746 let merged = BlockSparsePostingList::merge_with_offsets(&lists_with_offsets);
747
748 let mut bytes = Vec::new();
749 merged.serialize(&mut bytes).map_err(crate::Error::Io)?;
750 dim_bytes.insert(dim_id, bytes);
751 }
752
753 drop(segment_postings);
755
756 field_data.push((field.0, quantization, dim_bytes.len() as u32, dim_bytes));
758 }
759
760 if field_data.is_empty() {
761 return Ok(0);
762 }
763
764 field_data.sort_by_key(|(id, _, _, _)| *id);
766
767 let mut header_size = 4u64;
771 for (_, _, num_dims, _) in &field_data {
772 header_size += 4 + 1 + 4;
773 header_size += (*num_dims as u64) * 16;
774 }
775
776 let mut current_offset = header_size;
778 let mut field_tables: Vec<Vec<(u32, u64, u32)>> = Vec::new();
779 for (_, _, _, dim_bytes) in &field_data {
780 let mut table: Vec<(u32, u64, u32)> = Vec::with_capacity(dim_bytes.len());
781 let mut dims: Vec<_> = dim_bytes.keys().copied().collect();
782 dims.sort();
783 for dim_id in dims {
784 let bytes = &dim_bytes[&dim_id];
785 table.push((dim_id, current_offset, bytes.len() as u32));
786 current_offset += bytes.len() as u64;
787 }
788 field_tables.push(table);
789 }
790
791 let mut writer = OffsetWriter::new(dir.streaming_writer(&files.sparse).await?);
793
794 writer.write_u32::<LittleEndian>(field_data.len() as u32)?;
795 for (i, (field_id, quantization, num_dims, _)) in field_data.iter().enumerate() {
796 writer.write_u32::<LittleEndian>(*field_id)?;
797 writer.write_u8(*quantization as u8)?;
798 writer.write_u32::<LittleEndian>(*num_dims)?;
799 for &(dim_id, offset, length) in &field_tables[i] {
800 writer.write_u32::<LittleEndian>(dim_id)?;
801 writer.write_u64::<LittleEndian>(offset)?;
802 writer.write_u32::<LittleEndian>(length)?;
803 }
804 }
805
806 for (_, _, _, dim_bytes) in field_data {
808 let mut dims: Vec<_> = dim_bytes.keys().copied().collect();
809 dims.sort();
810 for dim_id in dims {
811 writer.write_all(&dim_bytes[&dim_id])?;
812 }
813 }
814
815 let output_size = writer.offset() as usize;
816 writer.finish()?;
817
818 log::info!(
819 "Sparse vector merge complete: {} fields, {} bytes",
820 field_tables.len(),
821 output_size
822 );
823
824 Ok(output_size)
825 }
826
827 pub async fn merge_with_ann<D: Directory + DirectoryWriter>(
829 &self,
830 dir: &D,
831 segments: &[SegmentReader],
832 new_segment_id: SegmentId,
833 trained: &TrainedVectorStructures,
834 ) -> Result<(SegmentMeta, MergeStats)> {
835 self.merge_core(
836 dir,
837 segments,
838 new_segment_id,
839 DenseVectorStrategy::BuildAnn(trained),
840 )
841 .await
842 }
843
844 async fn build_ann_vectors<D: Directory + DirectoryWriter>(
846 &self,
847 dir: &D,
848 segments: &[SegmentReader],
849 files: &SegmentFiles,
850 trained: &TrainedVectorStructures,
851 ) -> Result<usize> {
852 use crate::dsl::VectorIndexType;
853
854 let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
855
856 for (field, entry) in self.schema.fields() {
857 if !matches!(entry.field_type, FieldType::DenseVector) || !entry.indexed {
858 continue;
859 }
860
861 let config = match &entry.dense_vector_config {
862 Some(c) => c,
863 None => continue,
864 };
865
866 let mut all_vectors: Vec<Vec<f32>> = Vec::new();
868 let mut all_doc_ids: Vec<(u32, u16)> = Vec::new();
869 let mut doc_offset = 0u32;
870
871 for segment in segments {
872 if let Some(super::VectorIndex::Flat(flat_data)) =
873 segment.vector_indexes().get(&field.0)
874 {
875 for (vec, (local_doc_id, ordinal)) in
876 flat_data.vectors.iter().zip(flat_data.doc_ids.iter())
877 {
878 all_vectors.push(vec.clone());
879 all_doc_ids.push((doc_offset + local_doc_id, *ordinal));
880 }
881 }
882 doc_offset += segment.num_docs();
883 }
884
885 if all_vectors.is_empty() {
886 continue;
887 }
888
889 let dim = config.index_dim();
890
891 let ann_doc_ids: Vec<u32> = all_doc_ids.iter().map(|(doc_id, _)| *doc_id).collect();
893
894 match config.index_type {
896 VectorIndexType::IvfRaBitQ => {
897 if let Some(centroids) = trained.centroids.get(&field.0) {
898 let rabitq_config = crate::structures::RaBitQConfig::new(dim);
900 let codebook = crate::structures::RaBitQCodebook::new(rabitq_config);
901
902 let ivf_config = crate::structures::IVFRaBitQConfig::new(dim)
904 .with_store_raw(config.store_raw);
905 let ivf_index = crate::structures::IVFRaBitQIndex::build(
906 ivf_config,
907 centroids,
908 &codebook,
909 &all_vectors,
910 Some(&ann_doc_ids),
911 );
912
913 let index_data = super::builder::IVFRaBitQIndexData {
914 centroids: (**centroids).clone(),
915 codebook,
916 index: ivf_index,
917 };
918 let bytes = index_data
919 .to_bytes()
920 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
921 field_indexes.push((field.0, 1u8, bytes)); log::info!(
924 "Built IVF-RaBitQ index for field {} with {} vectors",
925 field.0,
926 all_vectors.len()
927 );
928 continue;
929 }
930 }
931 VectorIndexType::ScaNN => {
932 if let (Some(centroids), Some(codebook)) = (
933 trained.centroids.get(&field.0),
934 trained.codebooks.get(&field.0),
935 ) {
936 let ivf_pq_config = crate::structures::IVFPQConfig::new(dim);
938 let ivf_pq_index = crate::structures::IVFPQIndex::build(
939 ivf_pq_config,
940 centroids,
941 codebook,
942 &all_vectors,
943 Some(&ann_doc_ids),
944 );
945
946 let index_data = super::builder::ScaNNIndexData {
947 centroids: (**centroids).clone(),
948 codebook: (**codebook).clone(),
949 index: ivf_pq_index,
950 };
951 let bytes = index_data
952 .to_bytes()
953 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
954 field_indexes.push((field.0, 2u8, bytes)); log::info!(
957 "Built ScaNN index for field {} with {} vectors",
958 field.0,
959 all_vectors.len()
960 );
961 continue;
962 }
963 }
964 _ => {}
965 }
966
967 let flat_data = super::builder::FlatVectorData {
969 dim,
970 vectors: all_vectors,
971 doc_ids: all_doc_ids,
972 };
973 let bytes = flat_data.to_binary_bytes();
974 field_indexes.push((field.0, 4u8, bytes)); }
976
977 write_vector_file(dir, files, field_indexes).await
978 }
979}
980
981async fn write_vector_file<D: Directory + DirectoryWriter>(
985 dir: &D,
986 files: &SegmentFiles,
987 mut field_indexes: Vec<(u32, u8, Vec<u8>)>,
988) -> Result<usize> {
989 use byteorder::{LittleEndian, WriteBytesExt};
990
991 if field_indexes.is_empty() {
992 return Ok(0);
993 }
994
995 field_indexes.sort_by_key(|(id, _, _)| *id);
996
997 let mut writer = OffsetWriter::new(dir.streaming_writer(&files.vectors).await?);
998
999 let header_size = 4 + field_indexes.len() * (4 + 1 + 8 + 8);
1001 writer.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
1002
1003 let mut current_offset = header_size as u64;
1004 for (field_id, index_type, data) in &field_indexes {
1005 writer.write_u32::<LittleEndian>(*field_id)?;
1006 writer.write_u8(*index_type)?;
1007 writer.write_u64::<LittleEndian>(current_offset)?;
1008 writer.write_u64::<LittleEndian>(data.len() as u64)?;
1009 current_offset += data.len() as u64;
1010 }
1011
1012 for (_, _, data) in field_indexes {
1014 writer.write_all(&data)?;
1015 }
1016
1017 let output_size = writer.offset() as usize;
1018 writer.finish()?;
1019 Ok(output_size)
1020}
1021
1022pub async fn delete_segment<D: Directory + DirectoryWriter>(
1024 dir: &D,
1025 segment_id: SegmentId,
1026) -> Result<()> {
1027 let files = SegmentFiles::new(segment_id.0);
1028 let _ = dir.delete(&files.term_dict).await;
1029 let _ = dir.delete(&files.postings).await;
1030 let _ = dir.delete(&files.store).await;
1031 let _ = dir.delete(&files.meta).await;
1032 let _ = dir.delete(&files.vectors).await;
1033 Ok(())
1034}