1use std::cmp::Ordering;
4use std::collections::BinaryHeap;
5use std::io::Write;
6use std::mem::size_of;
7use std::sync::Arc;
8
9use rustc_hash::FxHashMap;
10
11use super::reader::SegmentReader;
12use super::store::StoreMerger;
13use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
14use crate::Result;
15use crate::directories::{Directory, DirectoryWriter, StreamingWriter};
16use crate::dsl::{FieldType, Schema};
17use crate::structures::{
18 BlockPostingList, PositionPostingList, PostingList, SSTableWriter, TERMINATED, TermInfo,
19};
20
21pub(crate) struct OffsetWriter {
26 inner: Box<dyn StreamingWriter>,
27 offset: u64,
28}
29
30impl OffsetWriter {
31 fn new(inner: Box<dyn StreamingWriter>) -> Self {
32 Self { inner, offset: 0 }
33 }
34
35 fn offset(&self) -> u64 {
37 self.offset
38 }
39
40 fn finish(self) -> std::io::Result<()> {
42 self.inner.finish()
43 }
44}
45
46impl Write for OffsetWriter {
47 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
48 let n = self.inner.write(buf)?;
49 self.offset += n as u64;
50 Ok(n)
51 }
52
53 fn flush(&mut self) -> std::io::Result<()> {
54 self.inner.flush()
55 }
56}
57
58fn format_bytes(bytes: usize) -> String {
60 if bytes >= 1024 * 1024 * 1024 {
61 format!("{:.2} GB", bytes as f64 / (1024.0 * 1024.0 * 1024.0))
62 } else if bytes >= 1024 * 1024 {
63 format!("{:.2} MB", bytes as f64 / (1024.0 * 1024.0))
64 } else if bytes >= 1024 {
65 format!("{:.2} KB", bytes as f64 / 1024.0)
66 } else {
67 format!("{} B", bytes)
68 }
69}
70
71fn doc_offsets(segments: &[SegmentReader]) -> Vec<u32> {
73 let mut offsets = Vec::with_capacity(segments.len());
74 let mut acc = 0u32;
75 for seg in segments {
76 offsets.push(acc);
77 acc += seg.num_docs();
78 }
79 offsets
80}
81
82#[derive(Debug, Clone, Default)]
84pub struct MergeStats {
85 pub terms_processed: usize,
87 pub peak_memory_bytes: usize,
89 pub current_memory_bytes: usize,
91 pub term_dict_bytes: usize,
93 pub postings_bytes: usize,
95 pub store_bytes: usize,
97 pub vectors_bytes: usize,
99 pub sparse_bytes: usize,
101}
102
103struct MergeEntry {
105 key: Vec<u8>,
106 term_info: TermInfo,
107 segment_idx: usize,
108 doc_offset: u32,
109}
110
111impl PartialEq for MergeEntry {
112 fn eq(&self, other: &Self) -> bool {
113 self.key == other.key
114 }
115}
116
117impl Eq for MergeEntry {}
118
119impl PartialOrd for MergeEntry {
120 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
121 Some(self.cmp(other))
122 }
123}
124
125impl Ord for MergeEntry {
126 fn cmp(&self, other: &Self) -> Ordering {
127 other.key.cmp(&self.key)
129 }
130}
131
132pub struct TrainedVectorStructures {
134 pub centroids: rustc_hash::FxHashMap<u32, Arc<crate::structures::CoarseCentroids>>,
136 pub codebooks: rustc_hash::FxHashMap<u32, Arc<crate::structures::PQCodebook>>,
138}
139
140pub struct SegmentMerger {
142 schema: Arc<Schema>,
143}
144
145impl SegmentMerger {
146 pub fn new(schema: Arc<Schema>) -> Self {
147 Self { schema }
148 }
149
150 pub async fn merge<D: Directory + DirectoryWriter>(
152 &self,
153 dir: &D,
154 segments: &[SegmentReader],
155 new_segment_id: SegmentId,
156 ) -> Result<(SegmentMeta, MergeStats)> {
157 self.merge_core(dir, segments, new_segment_id, None).await
158 }
159
160 pub async fn merge_with_ann<D: Directory + DirectoryWriter>(
166 &self,
167 dir: &D,
168 segments: &[SegmentReader],
169 new_segment_id: SegmentId,
170 trained: &TrainedVectorStructures,
171 ) -> Result<(SegmentMeta, MergeStats)> {
172 self.merge_core(dir, segments, new_segment_id, Some(trained))
173 .await
174 }
175
176 async fn merge_core<D: Directory + DirectoryWriter>(
183 &self,
184 dir: &D,
185 segments: &[SegmentReader],
186 new_segment_id: SegmentId,
187 trained: Option<&TrainedVectorStructures>,
188 ) -> Result<(SegmentMeta, MergeStats)> {
189 let mut stats = MergeStats::default();
190 let files = SegmentFiles::new(new_segment_id.0);
191
192 let mut postings_writer = OffsetWriter::new(dir.streaming_writer(&files.postings).await?);
194 let mut positions_writer = OffsetWriter::new(dir.streaming_writer(&files.positions).await?);
195 let mut term_dict_writer = OffsetWriter::new(dir.streaming_writer(&files.term_dict).await?);
196
197 let terms_processed = self
198 .merge_postings(
199 segments,
200 &mut term_dict_writer,
201 &mut postings_writer,
202 &mut positions_writer,
203 &mut stats,
204 )
205 .await?;
206 stats.terms_processed = terms_processed;
207 stats.postings_bytes = postings_writer.offset() as usize;
208 stats.term_dict_bytes = term_dict_writer.offset() as usize;
209 let positions_bytes = positions_writer.offset();
210
211 postings_writer.finish()?;
212 term_dict_writer.finish()?;
213 if positions_bytes > 0 {
214 positions_writer.finish()?;
215 } else {
216 drop(positions_writer);
217 let _ = dir.delete(&files.positions).await;
218 }
219
220 {
222 let mut store_writer = OffsetWriter::new(dir.streaming_writer(&files.store).await?);
223 {
224 let mut store_merger = StoreMerger::new(&mut store_writer);
225 for segment in segments {
226 if segment.store_has_dict() {
227 store_merger
228 .append_store_recompressing(segment.store())
229 .await
230 .map_err(crate::Error::Io)?;
231 } else {
232 let raw_blocks = segment.store_raw_blocks();
233 let data_slice = segment.store_data_slice();
234 store_merger.append_store(data_slice, &raw_blocks).await?;
235 }
236 }
237 store_merger.finish()?;
238 }
239 stats.store_bytes = store_writer.offset() as usize;
240 store_writer.finish()?;
241 }
242
243 let vectors_bytes = self
245 .merge_dense_vectors(dir, segments, &files, trained)
246 .await?;
247 stats.vectors_bytes = vectors_bytes;
248
249 let sparse_bytes = self.merge_sparse_vectors(dir, segments, &files).await?;
251 stats.sparse_bytes = sparse_bytes;
252
253 let mut merged_field_stats: FxHashMap<u32, FieldStats> = FxHashMap::default();
255 for segment in segments {
256 for (&field_id, field_stats) in &segment.meta().field_stats {
257 let entry = merged_field_stats.entry(field_id).or_default();
258 entry.total_tokens += field_stats.total_tokens;
259 entry.doc_count += field_stats.doc_count;
260 }
261 }
262
263 let total_docs: u32 = segments.iter().map(|s| s.num_docs()).sum();
264 let meta = SegmentMeta {
265 id: new_segment_id.0,
266 num_docs: total_docs,
267 field_stats: merged_field_stats,
268 };
269
270 dir.write(&files.meta, &meta.serialize()?).await?;
271
272 let label = if trained.is_some() {
273 "ANN merge"
274 } else {
275 "Merge"
276 };
277 log::info!(
278 "{} complete: {} docs, {} terms, term_dict={}, postings={}, store={}, vectors={}, sparse={}",
279 label,
280 total_docs,
281 stats.terms_processed,
282 format_bytes(stats.term_dict_bytes),
283 format_bytes(stats.postings_bytes),
284 format_bytes(stats.store_bytes),
285 format_bytes(stats.vectors_bytes),
286 format_bytes(stats.sparse_bytes),
287 );
288
289 Ok((meta, stats))
290 }
291
292 async fn merge_postings(
304 &self,
305 segments: &[SegmentReader],
306 term_dict: &mut OffsetWriter,
307 postings_out: &mut OffsetWriter,
308 positions_out: &mut OffsetWriter,
309 stats: &mut MergeStats,
310 ) -> Result<usize> {
311 let doc_offs = doc_offsets(segments);
312
313 for (i, segment) in segments.iter().enumerate() {
315 log::debug!("Prefetching term dict for segment {} ...", i);
316 segment.prefetch_term_dict().await?;
317 }
318
319 let mut iterators: Vec<_> = segments.iter().map(|s| s.term_dict_iter()).collect();
321
322 let mut heap: BinaryHeap<MergeEntry> = BinaryHeap::new();
324 for (seg_idx, iter) in iterators.iter_mut().enumerate() {
325 if let Some((key, term_info)) = iter.next().await.map_err(crate::Error::from)? {
326 heap.push(MergeEntry {
327 key,
328 term_info,
329 segment_idx: seg_idx,
330 doc_offset: doc_offs[seg_idx],
331 });
332 }
333 }
334
335 let mut term_results: Vec<(Vec<u8>, TermInfo)> = Vec::new();
338 let mut terms_processed = 0usize;
339
340 while !heap.is_empty() {
341 let first = heap.pop().unwrap();
343 let current_key = first.key.clone();
344
345 let mut sources: Vec<(usize, TermInfo, u32)> =
347 vec![(first.segment_idx, first.term_info, first.doc_offset)];
348
349 if let Some((key, term_info)) = iterators[first.segment_idx]
351 .next()
352 .await
353 .map_err(crate::Error::from)?
354 {
355 heap.push(MergeEntry {
356 key,
357 term_info,
358 segment_idx: first.segment_idx,
359 doc_offset: doc_offs[first.segment_idx],
360 });
361 }
362
363 while let Some(entry) = heap.peek() {
365 if entry.key != current_key {
366 break;
367 }
368 let entry = heap.pop().unwrap();
369 sources.push((entry.segment_idx, entry.term_info, entry.doc_offset));
370
371 if let Some((key, term_info)) = iterators[entry.segment_idx]
373 .next()
374 .await
375 .map_err(crate::Error::from)?
376 {
377 heap.push(MergeEntry {
378 key,
379 term_info,
380 segment_idx: entry.segment_idx,
381 doc_offset: doc_offs[entry.segment_idx],
382 });
383 }
384 }
385
386 let term_info = self
388 .merge_term(segments, &sources, postings_out, positions_out)
389 .await?;
390
391 term_results.push((current_key, term_info));
392 terms_processed += 1;
393
394 if terms_processed.is_multiple_of(100_000) {
396 log::debug!("Merge progress: {} terms processed", terms_processed);
397 }
398 }
399
400 let results_mem = term_results.capacity() * std::mem::size_of::<(Vec<u8>, TermInfo)>();
402 stats.current_memory_bytes = results_mem;
403 stats.peak_memory_bytes = stats.peak_memory_bytes.max(stats.current_memory_bytes);
404
405 log::info!(
406 "[merge] complete: terms={}, segments={}, term_buffer={:.2} MB, postings={}, positions={}",
407 terms_processed,
408 segments.len(),
409 results_mem as f64 / (1024.0 * 1024.0),
410 format_bytes(postings_out.offset() as usize),
411 format_bytes(positions_out.offset() as usize),
412 );
413
414 let mut writer = SSTableWriter::<TermInfo>::new(term_dict);
416 for (key, term_info) in term_results {
417 writer.insert(&key, &term_info)?;
418 }
419 writer.finish()?;
420
421 Ok(terms_processed)
422 }
423
424 async fn merge_term(
430 &self,
431 segments: &[SegmentReader],
432 sources: &[(usize, TermInfo, u32)],
433 postings_out: &mut OffsetWriter,
434 positions_out: &mut OffsetWriter,
435 ) -> Result<TermInfo> {
436 let mut sorted: Vec<_> = sources.to_vec();
437 sorted.sort_by_key(|(_, _, off)| *off);
438
439 let any_positions = sorted.iter().any(|(_, ti, _)| ti.position_info().is_some());
440 let all_external = sorted.iter().all(|(_, ti, _)| ti.external_info().is_some());
441
442 let (posting_offset, posting_len, doc_count) = if all_external && sorted.len() > 1 {
444 let mut block_sources = Vec::with_capacity(sorted.len());
446 for (seg_idx, ti, doc_off) in &sorted {
447 let (off, len) = ti.external_info().unwrap();
448 let bytes = segments[*seg_idx].read_postings(off, len).await?;
449 let bpl = BlockPostingList::deserialize(&mut bytes.as_slice())?;
450 block_sources.push((bpl, *doc_off));
451 }
452 let merged = BlockPostingList::concatenate_blocks(&block_sources)?;
453 let offset = postings_out.offset();
454 let mut buf = Vec::new();
455 merged.serialize(&mut buf)?;
456 postings_out.write_all(&buf)?;
457 (offset, buf.len() as u32, merged.doc_count())
458 } else {
459 let mut merged = PostingList::new();
461 for (seg_idx, ti, doc_off) in &sorted {
462 if let Some((ids, tfs)) = ti.decode_inline() {
463 for (id, tf) in ids.into_iter().zip(tfs) {
464 merged.add(id + doc_off, tf);
465 }
466 } else {
467 let (off, len) = ti.external_info().unwrap();
468 let bytes = segments[*seg_idx].read_postings(off, len).await?;
469 let bpl = BlockPostingList::deserialize(&mut bytes.as_slice())?;
470 let mut it = bpl.iterator();
471 while it.doc() != TERMINATED {
472 merged.add(it.doc() + doc_off, it.term_freq());
473 it.advance();
474 }
475 }
476 }
477 if !any_positions {
479 let ids: Vec<u32> = merged.iter().map(|p| p.doc_id).collect();
480 let tfs: Vec<u32> = merged.iter().map(|p| p.term_freq).collect();
481 if let Some(inline) = TermInfo::try_inline(&ids, &tfs) {
482 return Ok(inline);
483 }
484 }
485 let offset = postings_out.offset();
486 let block = BlockPostingList::from_posting_list(&merged)?;
487 let mut buf = Vec::new();
488 block.serialize(&mut buf)?;
489 postings_out.write_all(&buf)?;
490 (offset, buf.len() as u32, merged.doc_count())
491 };
492
493 if any_positions {
495 let mut pos_sources = Vec::new();
496 for (seg_idx, ti, doc_off) in &sorted {
497 if let Some((pos_off, pos_len)) = ti.position_info()
498 && let Some(bytes) = segments[*seg_idx]
499 .read_position_bytes(pos_off, pos_len)
500 .await?
501 {
502 let pl = PositionPostingList::deserialize(&mut bytes.as_slice())
503 .map_err(crate::Error::Io)?;
504 pos_sources.push((pl, *doc_off));
505 }
506 }
507 if !pos_sources.is_empty() {
508 let merged = PositionPostingList::concatenate_blocks(&pos_sources)
509 .map_err(crate::Error::Io)?;
510 let offset = positions_out.offset();
511 let mut buf = Vec::new();
512 merged.serialize(&mut buf).map_err(crate::Error::Io)?;
513 positions_out.write_all(&buf)?;
514 return Ok(TermInfo::external_with_positions(
515 posting_offset,
516 posting_len,
517 doc_count,
518 offset,
519 buf.len() as u32,
520 ));
521 }
522 }
523
524 Ok(TermInfo::external(posting_offset, posting_len, doc_count))
525 }
526
527 fn for_each_vector(
531 segment: &SegmentReader,
532 field: crate::dsl::Field,
533 doc_id_offset: u32,
534 mut f: impl FnMut(u32, u16, &[f32]),
535 ) {
536 match segment.vector_indexes().get(&field.0) {
537 Some(super::VectorIndex::Flat(flat_data)) => {
538 for i in 0..flat_data.num_vectors() {
539 let (doc_id, ordinal) = flat_data.get_doc_id(i);
540 f(doc_id_offset + doc_id, ordinal, flat_data.get_vector(i));
541 }
542 }
543 Some(super::VectorIndex::RaBitQ(index)) => {
544 if let Some(raw_vecs) = &index.raw_vectors {
545 for (i, vec) in raw_vecs.iter().enumerate() {
546 f(doc_id_offset + i as u32, 0, vec);
547 }
548 }
549 }
550 Some(super::VectorIndex::IVF { index, .. }) => {
551 for cluster in index.clusters.clusters.values() {
552 if let Some(ref raw_vecs) = cluster.raw_vectors {
553 for (i, raw) in raw_vecs.iter().enumerate() {
554 f(doc_id_offset + cluster.doc_ids[i], cluster.ordinals[i], raw);
555 }
556 }
557 }
558 }
559 _ => {}
560 }
561 }
562
563 async fn merge_dense_vectors<D: Directory + DirectoryWriter>(
571 &self,
572 dir: &D,
573 segments: &[SegmentReader],
574 files: &SegmentFiles,
575 trained: Option<&TrainedVectorStructures>,
576 ) -> Result<usize> {
577 use super::vector_data::FlatVectorData;
578 use crate::dsl::VectorIndexType;
579
580 struct BlobField {
582 field_id: u32,
583 index_type: u8,
584 data: Vec<u8>,
585 }
586
587 struct FlatStreamField {
589 field_id: u32,
590 dim: usize,
591 total_vectors: usize,
592 }
593
594 let mut blob_fields: Vec<BlobField> = Vec::new();
595 let mut flat_fields: Vec<FlatStreamField> = Vec::new();
596 let doc_offs = doc_offsets(segments);
597
598 for (field, entry) in self.schema.fields() {
599 if !matches!(entry.field_type, FieldType::DenseVector) || !entry.indexed {
600 continue;
601 }
602
603 let scann_indexes: Vec<_> = segments
605 .iter()
606 .filter_map(|s| s.get_scann_vector_index(field))
607 .collect();
608
609 let segments_with_vectors = segments
610 .iter()
611 .filter(|s| s.has_dense_vector_index(field))
612 .count();
613
614 if scann_indexes.len() == segments_with_vectors && !scann_indexes.is_empty() {
615 let refs: Vec<&crate::structures::IVFPQIndex> =
616 scann_indexes.iter().map(|(idx, _)| idx.as_ref()).collect();
617
618 match crate::structures::IVFPQIndex::merge(&refs, &doc_offs) {
619 Ok(merged) => {
620 let bytes = merged
621 .to_bytes()
622 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
623 blob_fields.push(BlobField {
624 field_id: field.0,
625 index_type: 2,
626 data: bytes,
627 });
628 continue;
629 }
630 Err(e) => {
631 log::warn!("ScaNN merge failed: {}, falling back to rebuild", e);
632 }
633 }
634 }
635
636 let ivf_indexes: Vec<_> = segments
638 .iter()
639 .filter_map(|s| s.get_ivf_vector_index(field))
640 .collect();
641
642 if ivf_indexes.len() == segments_with_vectors && !ivf_indexes.is_empty() {
643 let refs: Vec<&crate::structures::IVFRaBitQIndex> =
644 ivf_indexes.iter().map(|arc| arc.as_ref()).collect();
645
646 match crate::structures::IVFRaBitQIndex::merge(&refs, &doc_offs) {
647 Ok(merged) => {
648 let bytes = merged
649 .to_bytes()
650 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
651 blob_fields.push(BlobField {
652 field_id: field.0,
653 index_type: 1,
654 data: bytes,
655 });
656 continue;
657 }
658 Err(e) => {
659 log::warn!("IVF merge failed: {}, falling back to rebuild", e);
660 }
661 }
662 }
663
664 let config = entry.dense_vector_config.as_ref();
666
667 let dim_from_segments = || -> usize {
669 segments
670 .iter()
671 .filter_map(|s| match s.vector_indexes().get(&field.0) {
672 Some(super::VectorIndex::Flat(f)) => Some(f.dim),
673 _ => None,
674 })
675 .find(|&d| d > 0)
676 .unwrap_or(0)
677 };
678 let dim = config
679 .map(|c| c.index_dim())
680 .unwrap_or_else(dim_from_segments);
681 if dim == 0 {
682 continue;
683 }
684
685 let all_flat = segments.iter().all(|s| {
687 matches!(
688 s.vector_indexes().get(&field.0),
689 Some(super::VectorIndex::Flat(_)) | None
690 )
691 });
692
693 let ann_type =
695 trained
696 .zip(config)
697 .and_then(|(trained, config)| match config.index_type {
698 VectorIndexType::IvfRaBitQ if trained.centroids.contains_key(&field.0) => {
699 Some(VectorIndexType::IvfRaBitQ)
700 }
701 VectorIndexType::ScaNN
702 if trained.centroids.contains_key(&field.0)
703 && trained.codebooks.contains_key(&field.0) =>
704 {
705 Some(VectorIndexType::ScaNN)
706 }
707 _ => None,
708 });
709
710 if let Some(ann) = ann_type {
711 let trained = trained.unwrap();
714 let config = config.unwrap();
715 let mut total_vectors = 0usize;
716
717 match ann {
718 VectorIndexType::IvfRaBitQ => {
719 let centroids = &trained.centroids[&field.0];
720 let rabitq_config = crate::structures::RaBitQConfig::new(dim);
721 let codebook = crate::structures::RaBitQCodebook::new(rabitq_config);
722 let ivf_config = crate::structures::IVFRaBitQConfig::new(dim)
723 .with_store_raw(config.store_raw);
724 let mut ivf_index = crate::structures::IVFRaBitQIndex::new(
725 ivf_config,
726 centroids.version,
727 codebook.version,
728 );
729
730 for (seg_idx, segment) in segments.iter().enumerate() {
732 let offset = doc_offs[seg_idx];
733 Self::for_each_vector(
734 segment,
735 field,
736 offset,
737 |doc_id, ordinal, vec| {
738 ivf_index
739 .add_vector(centroids, &codebook, doc_id, ordinal, vec);
740 total_vectors += 1;
741 },
742 );
743 }
744
745 let index_data = super::builder::IVFRaBitQIndexData {
746 centroids: (**centroids).clone(),
747 codebook,
748 index: ivf_index,
749 };
750 let bytes = index_data
751 .to_bytes()
752 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
753 blob_fields.push(BlobField {
754 field_id: field.0,
755 index_type: 1,
756 data: bytes,
757 });
758 log::info!(
759 "Rebuilt IVF-RaBitQ for field {} ({} vectors, streaming)",
760 field.0,
761 total_vectors
762 );
763 }
764 VectorIndexType::ScaNN => {
765 let centroids = &trained.centroids[&field.0];
766 let codebook = &trained.codebooks[&field.0];
767 let ivf_pq_config = crate::structures::IVFPQConfig::new(dim);
768 let mut ivf_pq_index = crate::structures::IVFPQIndex::new(
769 ivf_pq_config,
770 centroids.version,
771 codebook.version,
772 );
773
774 for (seg_idx, segment) in segments.iter().enumerate() {
776 let offset = doc_offs[seg_idx];
777 Self::for_each_vector(
778 segment,
779 field,
780 offset,
781 |doc_id, ordinal, vec| {
782 ivf_pq_index
783 .add_vector(centroids, codebook, doc_id, ordinal, vec);
784 total_vectors += 1;
785 },
786 );
787 }
788
789 let index_data = super::builder::ScaNNIndexData {
790 centroids: (**centroids).clone(),
791 codebook: (**codebook).clone(),
792 index: ivf_pq_index,
793 };
794 let bytes = index_data
795 .to_bytes()
796 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
797 blob_fields.push(BlobField {
798 field_id: field.0,
799 index_type: 2,
800 data: bytes,
801 });
802 log::info!(
803 "Rebuilt ScaNN for field {} ({} vectors, streaming)",
804 field.0,
805 total_vectors
806 );
807 }
808 _ => {}
809 }
810 } else if all_flat {
811 let total_vectors: usize = segments
813 .iter()
814 .filter_map(|s| {
815 if let Some(super::VectorIndex::Flat(f)) = s.vector_indexes().get(&field.0)
816 {
817 Some(f.num_vectors())
818 } else {
819 None
820 }
821 })
822 .sum();
823
824 if total_vectors > 0 {
825 flat_fields.push(FlatStreamField {
826 field_id: field.0,
827 dim,
828 total_vectors,
829 });
830 }
831 } else {
832 let mut total_vectors = 0usize;
835 for segment in segments {
836 Self::for_each_vector(segment, field, 0, |_, _, _| {
837 total_vectors += 1;
838 });
839 }
840 if total_vectors == 0 {
841 continue;
842 }
843
844 let total_size = FlatVectorData::serialized_binary_size(dim, total_vectors);
845 let mut buf = Vec::with_capacity(total_size);
846 FlatVectorData::write_binary_header(dim, total_vectors, &mut buf)
847 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
848
849 for segment in segments {
851 Self::for_each_vector(segment, field, 0, |_, _, vec| {
852 let bytes: &[u8] = unsafe {
853 std::slice::from_raw_parts(
854 vec.as_ptr() as *const u8,
855 std::mem::size_of_val(vec),
856 )
857 };
858 let _ = buf.write_all(bytes);
859 });
860 }
861
862 for (seg_idx, segment) in segments.iter().enumerate() {
864 let offset = doc_offs[seg_idx];
865 Self::for_each_vector(segment, field, offset, |doc_id, ordinal, _| {
866 let _ = buf.write_all(&doc_id.to_le_bytes());
867 let _ = buf.write_all(&ordinal.to_le_bytes());
868 });
869 }
870
871 blob_fields.push(BlobField {
872 field_id: field.0,
873 index_type: 4,
874 data: buf,
875 });
876 }
877 }
878
879 let total_fields = blob_fields.len() + flat_fields.len();
881 if total_fields == 0 {
882 return Ok(0);
883 }
884
885 struct FieldEntry {
887 field_id: u32,
888 index_type: u8,
889 data_size: u64,
890 blob_idx: Option<usize>,
892 flat_idx: Option<usize>,
894 }
895
896 let mut entries: Vec<FieldEntry> = Vec::with_capacity(total_fields);
897
898 for (i, blob) in blob_fields.iter().enumerate() {
899 entries.push(FieldEntry {
900 field_id: blob.field_id,
901 index_type: blob.index_type,
902 data_size: blob.data.len() as u64,
903 blob_idx: Some(i),
904 flat_idx: None,
905 });
906 }
907
908 for (i, flat) in flat_fields.iter().enumerate() {
909 entries.push(FieldEntry {
910 field_id: flat.field_id,
911 index_type: 4, data_size: FlatVectorData::serialized_binary_size(flat.dim, flat.total_vectors)
913 as u64,
914 blob_idx: None,
915 flat_idx: Some(i),
916 });
917 }
918
919 entries.sort_by_key(|e| e.field_id);
920
921 use byteorder::{LittleEndian, WriteBytesExt};
923 let mut writer = OffsetWriter::new(dir.streaming_writer(&files.vectors).await?);
924
925 let per_field_entry =
926 size_of::<u32>() + size_of::<u8>() + size_of::<u64>() + size_of::<u64>();
927 let header_size = size_of::<u32>() + entries.len() * per_field_entry;
928
929 writer.write_u32::<LittleEndian>(entries.len() as u32)?;
930
931 let mut current_offset = header_size as u64;
932 for entry in &entries {
933 writer.write_u32::<LittleEndian>(entry.field_id)?;
934 writer.write_u8(entry.index_type)?;
935 writer.write_u64::<LittleEndian>(current_offset)?;
936 writer.write_u64::<LittleEndian>(entry.data_size)?;
937 current_offset += entry.data_size;
938 }
939
940 for entry in &entries {
942 if let Some(blob_idx) = entry.blob_idx {
943 writer.write_all(&blob_fields[blob_idx].data)?;
944 } else if let Some(flat_idx) = entry.flat_idx {
945 let flat = &flat_fields[flat_idx];
946
947 FlatVectorData::write_binary_header(flat.dim, flat.total_vectors, &mut writer)?;
949
950 for segment in segments {
952 if let Some(super::VectorIndex::Flat(flat_data)) =
953 segment.vector_indexes().get(&entry.field_id)
954 {
955 writer.write_all(flat_data.vectors_as_bytes())?;
956 }
957 }
958
959 for (seg_idx, segment) in segments.iter().enumerate() {
961 if let Some(super::VectorIndex::Flat(flat_data)) =
962 segment.vector_indexes().get(&entry.field_id)
963 {
964 let offset = doc_offs[seg_idx];
965 for &(doc_id, ordinal) in &flat_data.doc_ids {
966 writer.write_all(&(offset + doc_id).to_le_bytes())?;
967 writer.write_all(&ordinal.to_le_bytes())?;
968 }
969 }
970 }
971 }
972 }
973
974 let output_size = writer.offset() as usize;
975 writer.finish()?;
976 Ok(output_size)
977 }
978
979 async fn merge_sparse_vectors<D: Directory + DirectoryWriter>(
985 &self,
986 dir: &D,
987 segments: &[SegmentReader],
988 files: &SegmentFiles,
989 ) -> Result<usize> {
990 use crate::structures::BlockSparsePostingList;
991 use byteorder::{LittleEndian, WriteBytesExt};
992
993 let doc_offs = doc_offsets(segments);
994 for (i, seg) in segments.iter().enumerate() {
995 log::debug!(
996 "Sparse merge: segment {} has {} docs, doc_offset={}",
997 i,
998 seg.num_docs(),
999 doc_offs[i]
1000 );
1001 }
1002
1003 let sparse_fields: Vec<_> = self
1005 .schema
1006 .fields()
1007 .filter(|(_, entry)| matches!(entry.field_type, FieldType::SparseVector))
1008 .map(|(field, entry)| (field, entry.sparse_vector_config.clone()))
1009 .collect();
1010
1011 if sparse_fields.is_empty() {
1012 return Ok(0);
1013 }
1014
1015 type SparseFieldData = (
1017 u32,
1018 crate::structures::WeightQuantization,
1019 u32,
1020 FxHashMap<u32, Vec<u8>>,
1021 );
1022 let mut field_data: Vec<SparseFieldData> = Vec::new();
1023
1024 for (field, sparse_config) in &sparse_fields {
1025 let quantization = sparse_config
1027 .as_ref()
1028 .map(|c| c.weight_quantization)
1029 .unwrap_or(crate::structures::WeightQuantization::Float32);
1030
1031 let mut all_dims: rustc_hash::FxHashSet<u32> = rustc_hash::FxHashSet::default();
1033 for segment in segments {
1034 if let Some(sparse_index) = segment.sparse_indexes().get(&field.0) {
1035 for dim_id in sparse_index.active_dimensions() {
1036 all_dims.insert(dim_id);
1037 }
1038 }
1039 }
1040
1041 if all_dims.is_empty() {
1042 continue;
1043 }
1044
1045 let mut segment_postings: Vec<FxHashMap<u32, Arc<BlockSparsePostingList>>> =
1048 Vec::with_capacity(segments.len());
1049 for (seg_idx, segment) in segments.iter().enumerate() {
1050 if let Some(sparse_index) = segment.sparse_indexes().get(&field.0) {
1051 log::debug!(
1052 "Sparse merge field {}: bulk-reading {} dims from segment {}",
1053 field.0,
1054 sparse_index.num_dimensions(),
1055 seg_idx
1056 );
1057 let postings = sparse_index.read_all_postings_bulk().await?;
1058 segment_postings.push(postings);
1059 } else {
1060 segment_postings.push(FxHashMap::default());
1061 }
1062 }
1063
1064 let mut dim_bytes: FxHashMap<u32, Vec<u8>> = FxHashMap::default();
1065
1066 for dim_id in all_dims {
1068 let mut posting_arcs: Vec<(Arc<BlockSparsePostingList>, u32)> = Vec::new();
1069
1070 for (seg_idx, postings) in segment_postings.iter().enumerate() {
1071 if let Some(posting_list) = postings.get(&dim_id) {
1072 posting_arcs.push((Arc::clone(posting_list), doc_offs[seg_idx]));
1073 }
1074 }
1075
1076 if posting_arcs.is_empty() {
1077 continue;
1078 }
1079
1080 let lists_with_offsets: Vec<(&BlockSparsePostingList, u32)> = posting_arcs
1081 .iter()
1082 .map(|(pl, offset)| (pl.as_ref(), *offset))
1083 .collect();
1084
1085 let merged = BlockSparsePostingList::merge_with_offsets(&lists_with_offsets);
1086
1087 let mut bytes = Vec::new();
1088 merged.serialize(&mut bytes).map_err(crate::Error::Io)?;
1089 dim_bytes.insert(dim_id, bytes);
1090 }
1091
1092 drop(segment_postings);
1094
1095 field_data.push((field.0, quantization, dim_bytes.len() as u32, dim_bytes));
1097 }
1098
1099 if field_data.is_empty() {
1100 return Ok(0);
1101 }
1102
1103 field_data.sort_by_key(|(id, _, _, _)| *id);
1105
1106 let per_dim_entry = size_of::<u32>() + size_of::<u64>() + size_of::<u32>();
1110 let per_field_header = size_of::<u32>() + size_of::<u8>() + size_of::<u32>();
1111 let mut header_size = size_of::<u32>() as u64;
1112 for (_, _, num_dims, _) in &field_data {
1113 header_size += per_field_header as u64;
1114 header_size += (*num_dims as u64) * per_dim_entry as u64;
1115 }
1116
1117 let mut current_offset = header_size;
1119 let mut field_tables: Vec<Vec<(u32, u64, u32)>> = Vec::new();
1120 for (_, _, _, dim_bytes) in &field_data {
1121 let mut table: Vec<(u32, u64, u32)> = Vec::with_capacity(dim_bytes.len());
1122 let mut dims: Vec<_> = dim_bytes.keys().copied().collect();
1123 dims.sort();
1124 for dim_id in dims {
1125 let bytes = &dim_bytes[&dim_id];
1126 table.push((dim_id, current_offset, bytes.len() as u32));
1127 current_offset += bytes.len() as u64;
1128 }
1129 field_tables.push(table);
1130 }
1131
1132 let mut writer = OffsetWriter::new(dir.streaming_writer(&files.sparse).await?);
1134
1135 writer.write_u32::<LittleEndian>(field_data.len() as u32)?;
1136 for (i, (field_id, quantization, num_dims, _)) in field_data.iter().enumerate() {
1137 writer.write_u32::<LittleEndian>(*field_id)?;
1138 writer.write_u8(*quantization as u8)?;
1139 writer.write_u32::<LittleEndian>(*num_dims)?;
1140 for &(dim_id, offset, length) in &field_tables[i] {
1141 writer.write_u32::<LittleEndian>(dim_id)?;
1142 writer.write_u64::<LittleEndian>(offset)?;
1143 writer.write_u32::<LittleEndian>(length)?;
1144 }
1145 }
1146
1147 for (_, _, _, dim_bytes) in field_data {
1149 let mut dims: Vec<_> = dim_bytes.keys().copied().collect();
1150 dims.sort();
1151 for dim_id in dims {
1152 writer.write_all(&dim_bytes[&dim_id])?;
1153 }
1154 }
1155
1156 let output_size = writer.offset() as usize;
1157 writer.finish()?;
1158
1159 log::info!(
1160 "Sparse vector merge complete: {} fields, {} bytes",
1161 field_tables.len(),
1162 output_size
1163 );
1164
1165 Ok(output_size)
1166 }
1167}
1168
1169pub async fn delete_segment<D: Directory + DirectoryWriter>(
1171 dir: &D,
1172 segment_id: SegmentId,
1173) -> Result<()> {
1174 let files = SegmentFiles::new(segment_id.0);
1175 let _ = dir.delete(&files.term_dict).await;
1176 let _ = dir.delete(&files.postings).await;
1177 let _ = dir.delete(&files.store).await;
1178 let _ = dir.delete(&files.meta).await;
1179 let _ = dir.delete(&files.vectors).await;
1180 Ok(())
1181}