1use std::cmp::Ordering;
4use std::collections::BinaryHeap;
5use std::io::Write;
6use std::mem::size_of;
7use std::sync::Arc;
8
9use rustc_hash::FxHashMap;
10
11use super::reader::SegmentReader;
12use super::store::StoreMerger;
13use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
14use crate::Result;
15use crate::directories::{Directory, DirectoryWriter, StreamingWriter};
16use crate::dsl::{FieldType, Schema};
17use crate::structures::{
18 BlockPostingList, PositionPostingList, PostingList, SSTableWriter, TERMINATED, TermInfo,
19};
20
21pub(crate) struct OffsetWriter {
26 inner: Box<dyn StreamingWriter>,
27 offset: u64,
28}
29
30impl OffsetWriter {
31 fn new(inner: Box<dyn StreamingWriter>) -> Self {
32 Self { inner, offset: 0 }
33 }
34
35 fn offset(&self) -> u64 {
37 self.offset
38 }
39
40 fn finish(self) -> std::io::Result<()> {
42 self.inner.finish()
43 }
44}
45
46impl Write for OffsetWriter {
47 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
48 let n = self.inner.write(buf)?;
49 self.offset += n as u64;
50 Ok(n)
51 }
52
53 fn flush(&mut self) -> std::io::Result<()> {
54 self.inner.flush()
55 }
56}
57
58fn format_bytes(bytes: usize) -> String {
60 if bytes >= 1024 * 1024 * 1024 {
61 format!("{:.2} GB", bytes as f64 / (1024.0 * 1024.0 * 1024.0))
62 } else if bytes >= 1024 * 1024 {
63 format!("{:.2} MB", bytes as f64 / (1024.0 * 1024.0))
64 } else if bytes >= 1024 {
65 format!("{:.2} KB", bytes as f64 / 1024.0)
66 } else {
67 format!("{} B", bytes)
68 }
69}
70
71fn doc_offsets(segments: &[SegmentReader]) -> Vec<u32> {
73 let mut offsets = Vec::with_capacity(segments.len());
74 let mut acc = 0u32;
75 for seg in segments {
76 offsets.push(acc);
77 acc += seg.num_docs();
78 }
79 offsets
80}
81
82#[derive(Debug, Clone, Default)]
84pub struct MergeStats {
85 pub terms_processed: usize,
87 pub peak_memory_bytes: usize,
89 pub current_memory_bytes: usize,
91 pub term_dict_bytes: usize,
93 pub postings_bytes: usize,
95 pub store_bytes: usize,
97 pub vectors_bytes: usize,
99 pub sparse_bytes: usize,
101}
102
103struct MergeEntry {
105 key: Vec<u8>,
106 term_info: TermInfo,
107 segment_idx: usize,
108 doc_offset: u32,
109}
110
111impl PartialEq for MergeEntry {
112 fn eq(&self, other: &Self) -> bool {
113 self.key == other.key
114 }
115}
116
117impl Eq for MergeEntry {}
118
119impl PartialOrd for MergeEntry {
120 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
121 Some(self.cmp(other))
122 }
123}
124
125impl Ord for MergeEntry {
126 fn cmp(&self, other: &Self) -> Ordering {
127 other.key.cmp(&self.key)
129 }
130}
131
132pub struct TrainedVectorStructures {
134 pub centroids: rustc_hash::FxHashMap<u32, Arc<crate::structures::CoarseCentroids>>,
136 pub codebooks: rustc_hash::FxHashMap<u32, Arc<crate::structures::PQCodebook>>,
138}
139
140pub struct SegmentMerger {
142 schema: Arc<Schema>,
143}
144
145impl SegmentMerger {
146 pub fn new(schema: Arc<Schema>) -> Self {
147 Self { schema }
148 }
149
150 pub async fn merge<D: Directory + DirectoryWriter>(
152 &self,
153 dir: &D,
154 segments: &[SegmentReader],
155 new_segment_id: SegmentId,
156 ) -> Result<(SegmentMeta, MergeStats)> {
157 self.merge_core(dir, segments, new_segment_id, None).await
158 }
159
160 pub async fn merge_with_ann<D: Directory + DirectoryWriter>(
166 &self,
167 dir: &D,
168 segments: &[SegmentReader],
169 new_segment_id: SegmentId,
170 trained: &TrainedVectorStructures,
171 ) -> Result<(SegmentMeta, MergeStats)> {
172 self.merge_core(dir, segments, new_segment_id, Some(trained))
173 .await
174 }
175
176 async fn merge_core<D: Directory + DirectoryWriter>(
183 &self,
184 dir: &D,
185 segments: &[SegmentReader],
186 new_segment_id: SegmentId,
187 trained: Option<&TrainedVectorStructures>,
188 ) -> Result<(SegmentMeta, MergeStats)> {
189 let mut stats = MergeStats::default();
190 let files = SegmentFiles::new(new_segment_id.0);
191
192 let mut postings_writer = OffsetWriter::new(dir.streaming_writer(&files.postings).await?);
194 let mut positions_writer = OffsetWriter::new(dir.streaming_writer(&files.positions).await?);
195 let mut term_dict_writer = OffsetWriter::new(dir.streaming_writer(&files.term_dict).await?);
196
197 let terms_processed = self
198 .merge_postings(
199 segments,
200 &mut term_dict_writer,
201 &mut postings_writer,
202 &mut positions_writer,
203 &mut stats,
204 )
205 .await?;
206 stats.terms_processed = terms_processed;
207 stats.postings_bytes = postings_writer.offset() as usize;
208 stats.term_dict_bytes = term_dict_writer.offset() as usize;
209 let positions_bytes = positions_writer.offset();
210
211 postings_writer.finish()?;
212 term_dict_writer.finish()?;
213 if positions_bytes > 0 {
214 positions_writer.finish()?;
215 } else {
216 drop(positions_writer);
217 let _ = dir.delete(&files.positions).await;
218 }
219
220 {
222 let mut store_writer = OffsetWriter::new(dir.streaming_writer(&files.store).await?);
223 {
224 let mut store_merger = StoreMerger::new(&mut store_writer);
225 for segment in segments {
226 if segment.store_has_dict() {
227 store_merger
228 .append_store_recompressing(segment.store())
229 .await
230 .map_err(crate::Error::Io)?;
231 } else {
232 let raw_blocks = segment.store_raw_blocks();
233 let data_slice = segment.store_data_slice();
234 store_merger.append_store(data_slice, &raw_blocks).await?;
235 }
236 }
237 store_merger.finish()?;
238 }
239 stats.store_bytes = store_writer.offset() as usize;
240 store_writer.finish()?;
241 }
242
243 let vectors_bytes = self
245 .merge_dense_vectors(dir, segments, &files, trained)
246 .await?;
247 stats.vectors_bytes = vectors_bytes;
248
249 let sparse_bytes = self.merge_sparse_vectors(dir, segments, &files).await?;
251 stats.sparse_bytes = sparse_bytes;
252
253 let mut merged_field_stats: FxHashMap<u32, FieldStats> = FxHashMap::default();
255 for segment in segments {
256 for (&field_id, field_stats) in &segment.meta().field_stats {
257 let entry = merged_field_stats.entry(field_id).or_default();
258 entry.total_tokens += field_stats.total_tokens;
259 entry.doc_count += field_stats.doc_count;
260 }
261 }
262
263 let total_docs: u32 = segments.iter().map(|s| s.num_docs()).sum();
264 let meta = SegmentMeta {
265 id: new_segment_id.0,
266 num_docs: total_docs,
267 field_stats: merged_field_stats,
268 };
269
270 dir.write(&files.meta, &meta.serialize()?).await?;
271
272 let label = if trained.is_some() {
273 "ANN merge"
274 } else {
275 "Merge"
276 };
277 log::info!(
278 "{} complete: {} docs, {} terms, term_dict={}, postings={}, store={}, vectors={}, sparse={}",
279 label,
280 total_docs,
281 stats.terms_processed,
282 format_bytes(stats.term_dict_bytes),
283 format_bytes(stats.postings_bytes),
284 format_bytes(stats.store_bytes),
285 format_bytes(stats.vectors_bytes),
286 format_bytes(stats.sparse_bytes),
287 );
288
289 Ok((meta, stats))
290 }
291
292 async fn merge_postings(
304 &self,
305 segments: &[SegmentReader],
306 term_dict: &mut OffsetWriter,
307 postings_out: &mut OffsetWriter,
308 positions_out: &mut OffsetWriter,
309 stats: &mut MergeStats,
310 ) -> Result<usize> {
311 let doc_offs = doc_offsets(segments);
312
313 for (i, segment) in segments.iter().enumerate() {
315 log::debug!("Prefetching term dict for segment {} ...", i);
316 segment.prefetch_term_dict().await?;
317 }
318
319 let mut iterators: Vec<_> = segments.iter().map(|s| s.term_dict_iter()).collect();
321
322 let mut heap: BinaryHeap<MergeEntry> = BinaryHeap::new();
324 for (seg_idx, iter) in iterators.iter_mut().enumerate() {
325 if let Some((key, term_info)) = iter.next().await.map_err(crate::Error::from)? {
326 heap.push(MergeEntry {
327 key,
328 term_info,
329 segment_idx: seg_idx,
330 doc_offset: doc_offs[seg_idx],
331 });
332 }
333 }
334
335 let mut term_results: Vec<(Vec<u8>, TermInfo)> = Vec::new();
338 let mut terms_processed = 0usize;
339
340 while !heap.is_empty() {
341 let first = heap.pop().unwrap();
343 let current_key = first.key.clone();
344
345 let mut sources: Vec<(usize, TermInfo, u32)> =
347 vec![(first.segment_idx, first.term_info, first.doc_offset)];
348
349 if let Some((key, term_info)) = iterators[first.segment_idx]
351 .next()
352 .await
353 .map_err(crate::Error::from)?
354 {
355 heap.push(MergeEntry {
356 key,
357 term_info,
358 segment_idx: first.segment_idx,
359 doc_offset: doc_offs[first.segment_idx],
360 });
361 }
362
363 while let Some(entry) = heap.peek() {
365 if entry.key != current_key {
366 break;
367 }
368 let entry = heap.pop().unwrap();
369 sources.push((entry.segment_idx, entry.term_info, entry.doc_offset));
370
371 if let Some((key, term_info)) = iterators[entry.segment_idx]
373 .next()
374 .await
375 .map_err(crate::Error::from)?
376 {
377 heap.push(MergeEntry {
378 key,
379 term_info,
380 segment_idx: entry.segment_idx,
381 doc_offset: doc_offs[entry.segment_idx],
382 });
383 }
384 }
385
386 let term_info = self
388 .merge_term(segments, &sources, postings_out, positions_out)
389 .await?;
390
391 term_results.push((current_key, term_info));
392 terms_processed += 1;
393
394 if terms_processed.is_multiple_of(100_000) {
396 log::debug!("Merge progress: {} terms processed", terms_processed);
397 }
398 }
399
400 let results_mem = term_results.capacity() * std::mem::size_of::<(Vec<u8>, TermInfo)>();
402 stats.current_memory_bytes = results_mem;
403 stats.peak_memory_bytes = stats.peak_memory_bytes.max(stats.current_memory_bytes);
404
405 log::info!(
406 "[merge] complete: terms={}, segments={}, term_buffer={:.2} MB, postings={}, positions={}",
407 terms_processed,
408 segments.len(),
409 results_mem as f64 / (1024.0 * 1024.0),
410 format_bytes(postings_out.offset() as usize),
411 format_bytes(positions_out.offset() as usize),
412 );
413
414 let mut writer = SSTableWriter::<TermInfo>::new(term_dict);
416 for (key, term_info) in term_results {
417 writer.insert(&key, &term_info)?;
418 }
419 writer.finish()?;
420
421 Ok(terms_processed)
422 }
423
424 async fn merge_term(
430 &self,
431 segments: &[SegmentReader],
432 sources: &[(usize, TermInfo, u32)],
433 postings_out: &mut OffsetWriter,
434 positions_out: &mut OffsetWriter,
435 ) -> Result<TermInfo> {
436 let mut sorted: Vec<_> = sources.to_vec();
437 sorted.sort_by_key(|(_, _, off)| *off);
438
439 let any_positions = sorted.iter().any(|(_, ti, _)| ti.position_info().is_some());
440 let all_external = sorted.iter().all(|(_, ti, _)| ti.external_info().is_some());
441
442 let (posting_offset, posting_len, doc_count) = if all_external && sorted.len() > 1 {
444 let mut block_sources = Vec::with_capacity(sorted.len());
446 for (seg_idx, ti, doc_off) in &sorted {
447 let (off, len) = ti.external_info().unwrap();
448 let bytes = segments[*seg_idx].read_postings(off, len).await?;
449 let bpl = BlockPostingList::deserialize(&mut bytes.as_slice())?;
450 block_sources.push((bpl, *doc_off));
451 }
452 let merged = BlockPostingList::concatenate_blocks(&block_sources)?;
453 let offset = postings_out.offset();
454 let mut buf = Vec::new();
455 merged.serialize(&mut buf)?;
456 postings_out.write_all(&buf)?;
457 (offset, buf.len() as u32, merged.doc_count())
458 } else {
459 let mut merged = PostingList::new();
461 for (seg_idx, ti, doc_off) in &sorted {
462 if let Some((ids, tfs)) = ti.decode_inline() {
463 for (id, tf) in ids.into_iter().zip(tfs) {
464 merged.add(id + doc_off, tf);
465 }
466 } else {
467 let (off, len) = ti.external_info().unwrap();
468 let bytes = segments[*seg_idx].read_postings(off, len).await?;
469 let bpl = BlockPostingList::deserialize(&mut bytes.as_slice())?;
470 let mut it = bpl.iterator();
471 while it.doc() != TERMINATED {
472 merged.add(it.doc() + doc_off, it.term_freq());
473 it.advance();
474 }
475 }
476 }
477 if !any_positions {
479 let ids: Vec<u32> = merged.iter().map(|p| p.doc_id).collect();
480 let tfs: Vec<u32> = merged.iter().map(|p| p.term_freq).collect();
481 if let Some(inline) = TermInfo::try_inline(&ids, &tfs) {
482 return Ok(inline);
483 }
484 }
485 let offset = postings_out.offset();
486 let block = BlockPostingList::from_posting_list(&merged)?;
487 let mut buf = Vec::new();
488 block.serialize(&mut buf)?;
489 postings_out.write_all(&buf)?;
490 (offset, buf.len() as u32, merged.doc_count())
491 };
492
493 if any_positions {
495 let mut pos_sources = Vec::new();
496 for (seg_idx, ti, doc_off) in &sorted {
497 if let Some((pos_off, pos_len)) = ti.position_info()
498 && let Some(bytes) = segments[*seg_idx]
499 .read_position_bytes(pos_off, pos_len)
500 .await?
501 {
502 let pl = PositionPostingList::deserialize(&mut bytes.as_slice())
503 .map_err(crate::Error::Io)?;
504 pos_sources.push((pl, *doc_off));
505 }
506 }
507 if !pos_sources.is_empty() {
508 let merged = PositionPostingList::concatenate_blocks(&pos_sources)
509 .map_err(crate::Error::Io)?;
510 let offset = positions_out.offset();
511 let mut buf = Vec::new();
512 merged.serialize(&mut buf).map_err(crate::Error::Io)?;
513 positions_out.write_all(&buf)?;
514 return Ok(TermInfo::external_with_positions(
515 posting_offset,
516 posting_len,
517 doc_count,
518 offset,
519 buf.len() as u32,
520 ));
521 }
522 }
523
524 Ok(TermInfo::external(posting_offset, posting_len, doc_count))
525 }
526
527 async fn merge_dense_vectors<D: Directory + DirectoryWriter>(
535 &self,
536 dir: &D,
537 segments: &[SegmentReader],
538 files: &SegmentFiles,
539 trained: Option<&TrainedVectorStructures>,
540 ) -> Result<usize> {
541 use crate::dsl::VectorIndexType;
542
543 let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
544
545 for (field, entry) in self.schema.fields() {
546 if !matches!(entry.field_type, FieldType::DenseVector) || !entry.indexed {
547 continue;
548 }
549
550 let scann_indexes: Vec<_> = segments
552 .iter()
553 .filter_map(|s| s.get_scann_vector_index(field))
554 .collect();
555
556 let segments_with_vectors = segments
557 .iter()
558 .filter(|s| s.has_dense_vector_index(field))
559 .count();
560
561 if scann_indexes.len() == segments_with_vectors && !scann_indexes.is_empty() {
562 let refs: Vec<&crate::structures::IVFPQIndex> =
563 scann_indexes.iter().map(|(idx, _)| idx.as_ref()).collect();
564 let doc_offs = doc_offsets(segments);
565
566 match crate::structures::IVFPQIndex::merge(&refs, &doc_offs) {
567 Ok(merged) => {
568 let bytes = merged
569 .to_bytes()
570 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
571 field_indexes.push((field.0, 2u8, bytes));
572 continue;
573 }
574 Err(e) => {
575 log::warn!("ScaNN merge failed: {}, falling back to rebuild", e);
576 }
577 }
578 }
579
580 let ivf_indexes: Vec<_> = segments
582 .iter()
583 .filter_map(|s| s.get_ivf_vector_index(field))
584 .collect();
585
586 if ivf_indexes.len() == segments_with_vectors && !ivf_indexes.is_empty() {
587 let refs: Vec<&crate::structures::IVFRaBitQIndex> =
588 ivf_indexes.iter().map(|arc| arc.as_ref()).collect();
589 let doc_offs = doc_offsets(segments);
590
591 match crate::structures::IVFRaBitQIndex::merge(&refs, &doc_offs) {
592 Ok(merged) => {
593 let bytes = merged
594 .to_bytes()
595 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
596 field_indexes.push((field.0, 1u8, bytes));
597 continue;
598 }
599 Err(e) => {
600 log::warn!("IVF merge failed: {}, falling back to rebuild", e);
601 }
602 }
603 }
604
605 let mut all_vectors: Vec<Vec<f32>> = Vec::new();
607 let mut all_doc_ids: Vec<(u32, u16)> = Vec::new();
608 let doc_offs = doc_offsets(segments);
609
610 for (seg_idx, segment) in segments.iter().enumerate() {
611 let offset = doc_offs[seg_idx];
612 match segment.vector_indexes().get(&field.0) {
613 Some(super::VectorIndex::Flat(flat_data)) => {
614 for (vec, &(local_doc_id, ordinal)) in
615 flat_data.vectors.iter().zip(flat_data.doc_ids.iter())
616 {
617 all_vectors.push(vec.clone());
618 all_doc_ids.push((offset + local_doc_id, ordinal));
619 }
620 }
621 Some(super::VectorIndex::RaBitQ(index)) => {
622 if let Some(raw_vecs) = &index.raw_vectors {
623 for (i, vec) in raw_vecs.iter().enumerate() {
624 all_vectors.push(vec.clone());
625 all_doc_ids.push((offset + i as u32, 0));
626 }
627 }
628 }
629 Some(super::VectorIndex::IVF { index, .. }) => {
630 for cluster in index.clusters.clusters.values() {
632 if let Some(ref raw_vecs) = cluster.raw_vectors {
633 for (i, raw) in raw_vecs.iter().enumerate() {
634 all_vectors.push(raw.clone());
635 all_doc_ids
636 .push((offset + cluster.doc_ids[i], cluster.ordinals[i]));
637 }
638 }
639 }
640 }
641 _ => {}
643 }
644 }
645
646 if all_vectors.is_empty() {
647 continue;
648 }
649
650 let config = entry.dense_vector_config.as_ref();
651 let dim = config
652 .map(|c| c.index_dim())
653 .unwrap_or(all_vectors[0].len());
654
655 let mut built_ann = false;
657 if let (Some(trained), Some(config)) = (trained, config) {
658 match config.index_type {
659 VectorIndexType::IvfRaBitQ => {
660 if let Some(centroids) = trained.centroids.get(&field.0) {
661 let rabitq_config = crate::structures::RaBitQConfig::new(dim);
662 let codebook = crate::structures::RaBitQCodebook::new(rabitq_config);
663 let ivf_config = crate::structures::IVFRaBitQConfig::new(dim)
664 .with_store_raw(config.store_raw);
665 let ivf_index = crate::structures::IVFRaBitQIndex::build(
666 ivf_config,
667 centroids,
668 &codebook,
669 &all_vectors,
670 Some(&all_doc_ids),
671 );
672 let index_data = super::builder::IVFRaBitQIndexData {
673 centroids: (**centroids).clone(),
674 codebook,
675 index: ivf_index,
676 };
677 let bytes = index_data
678 .to_bytes()
679 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
680 field_indexes.push((field.0, 1u8, bytes));
681 built_ann = true;
682 log::info!(
683 "Rebuilt IVF-RaBitQ for field {} ({} vectors)",
684 field.0,
685 all_vectors.len()
686 );
687 }
688 }
689 VectorIndexType::ScaNN => {
690 if let (Some(centroids), Some(codebook)) = (
691 trained.centroids.get(&field.0),
692 trained.codebooks.get(&field.0),
693 ) {
694 let ivf_pq_config = crate::structures::IVFPQConfig::new(dim);
695 let ivf_pq_index = crate::structures::IVFPQIndex::build(
696 ivf_pq_config,
697 centroids,
698 codebook,
699 &all_vectors,
700 Some(&all_doc_ids),
701 );
702 let index_data = super::builder::ScaNNIndexData {
703 centroids: (**centroids).clone(),
704 codebook: (**codebook).clone(),
705 index: ivf_pq_index,
706 };
707 let bytes = index_data
708 .to_bytes()
709 .map_err(|e| crate::Error::Serialization(e.to_string()))?;
710 field_indexes.push((field.0, 2u8, bytes));
711 built_ann = true;
712 log::info!(
713 "Rebuilt ScaNN for field {} ({} vectors)",
714 field.0,
715 all_vectors.len()
716 );
717 }
718 }
719 _ => {}
720 }
721 }
722
723 if !built_ann {
725 let flat_data = super::vector_data::FlatVectorData {
726 dim,
727 vectors: all_vectors,
728 doc_ids: all_doc_ids,
729 };
730 let bytes = flat_data.to_binary_bytes();
731 field_indexes.push((field.0, 4u8, bytes));
732 }
733 }
734
735 write_vector_file(dir, files, field_indexes).await
736 }
737
738 async fn merge_sparse_vectors<D: Directory + DirectoryWriter>(
744 &self,
745 dir: &D,
746 segments: &[SegmentReader],
747 files: &SegmentFiles,
748 ) -> Result<usize> {
749 use crate::structures::BlockSparsePostingList;
750 use byteorder::{LittleEndian, WriteBytesExt};
751
752 let doc_offs = doc_offsets(segments);
753 for (i, seg) in segments.iter().enumerate() {
754 log::debug!(
755 "Sparse merge: segment {} has {} docs, doc_offset={}",
756 i,
757 seg.num_docs(),
758 doc_offs[i]
759 );
760 }
761
762 let sparse_fields: Vec<_> = self
764 .schema
765 .fields()
766 .filter(|(_, entry)| matches!(entry.field_type, FieldType::SparseVector))
767 .map(|(field, entry)| (field, entry.sparse_vector_config.clone()))
768 .collect();
769
770 if sparse_fields.is_empty() {
771 return Ok(0);
772 }
773
774 type SparseFieldData = (
776 u32,
777 crate::structures::WeightQuantization,
778 u32,
779 FxHashMap<u32, Vec<u8>>,
780 );
781 let mut field_data: Vec<SparseFieldData> = Vec::new();
782
783 for (field, sparse_config) in &sparse_fields {
784 let quantization = sparse_config
786 .as_ref()
787 .map(|c| c.weight_quantization)
788 .unwrap_or(crate::structures::WeightQuantization::Float32);
789
790 let mut all_dims: rustc_hash::FxHashSet<u32> = rustc_hash::FxHashSet::default();
792 for segment in segments {
793 if let Some(sparse_index) = segment.sparse_indexes().get(&field.0) {
794 for dim_id in sparse_index.active_dimensions() {
795 all_dims.insert(dim_id);
796 }
797 }
798 }
799
800 if all_dims.is_empty() {
801 continue;
802 }
803
804 let mut segment_postings: Vec<FxHashMap<u32, Arc<BlockSparsePostingList>>> =
807 Vec::with_capacity(segments.len());
808 for (seg_idx, segment) in segments.iter().enumerate() {
809 if let Some(sparse_index) = segment.sparse_indexes().get(&field.0) {
810 log::debug!(
811 "Sparse merge field {}: bulk-reading {} dims from segment {}",
812 field.0,
813 sparse_index.num_dimensions(),
814 seg_idx
815 );
816 let postings = sparse_index.read_all_postings_bulk().await?;
817 segment_postings.push(postings);
818 } else {
819 segment_postings.push(FxHashMap::default());
820 }
821 }
822
823 let mut dim_bytes: FxHashMap<u32, Vec<u8>> = FxHashMap::default();
824
825 for dim_id in all_dims {
827 let mut posting_arcs: Vec<(Arc<BlockSparsePostingList>, u32)> = Vec::new();
828
829 for (seg_idx, postings) in segment_postings.iter().enumerate() {
830 if let Some(posting_list) = postings.get(&dim_id) {
831 posting_arcs.push((Arc::clone(posting_list), doc_offs[seg_idx]));
832 }
833 }
834
835 if posting_arcs.is_empty() {
836 continue;
837 }
838
839 let lists_with_offsets: Vec<(&BlockSparsePostingList, u32)> = posting_arcs
840 .iter()
841 .map(|(pl, offset)| (pl.as_ref(), *offset))
842 .collect();
843
844 let merged = BlockSparsePostingList::merge_with_offsets(&lists_with_offsets);
845
846 let mut bytes = Vec::new();
847 merged.serialize(&mut bytes).map_err(crate::Error::Io)?;
848 dim_bytes.insert(dim_id, bytes);
849 }
850
851 drop(segment_postings);
853
854 field_data.push((field.0, quantization, dim_bytes.len() as u32, dim_bytes));
856 }
857
858 if field_data.is_empty() {
859 return Ok(0);
860 }
861
862 field_data.sort_by_key(|(id, _, _, _)| *id);
864
865 let per_dim_entry = size_of::<u32>() + size_of::<u64>() + size_of::<u32>();
869 let per_field_header = size_of::<u32>() + size_of::<u8>() + size_of::<u32>();
870 let mut header_size = size_of::<u32>() as u64;
871 for (_, _, num_dims, _) in &field_data {
872 header_size += per_field_header as u64;
873 header_size += (*num_dims as u64) * per_dim_entry as u64;
874 }
875
876 let mut current_offset = header_size;
878 let mut field_tables: Vec<Vec<(u32, u64, u32)>> = Vec::new();
879 for (_, _, _, dim_bytes) in &field_data {
880 let mut table: Vec<(u32, u64, u32)> = Vec::with_capacity(dim_bytes.len());
881 let mut dims: Vec<_> = dim_bytes.keys().copied().collect();
882 dims.sort();
883 for dim_id in dims {
884 let bytes = &dim_bytes[&dim_id];
885 table.push((dim_id, current_offset, bytes.len() as u32));
886 current_offset += bytes.len() as u64;
887 }
888 field_tables.push(table);
889 }
890
891 let mut writer = OffsetWriter::new(dir.streaming_writer(&files.sparse).await?);
893
894 writer.write_u32::<LittleEndian>(field_data.len() as u32)?;
895 for (i, (field_id, quantization, num_dims, _)) in field_data.iter().enumerate() {
896 writer.write_u32::<LittleEndian>(*field_id)?;
897 writer.write_u8(*quantization as u8)?;
898 writer.write_u32::<LittleEndian>(*num_dims)?;
899 for &(dim_id, offset, length) in &field_tables[i] {
900 writer.write_u32::<LittleEndian>(dim_id)?;
901 writer.write_u64::<LittleEndian>(offset)?;
902 writer.write_u32::<LittleEndian>(length)?;
903 }
904 }
905
906 for (_, _, _, dim_bytes) in field_data {
908 let mut dims: Vec<_> = dim_bytes.keys().copied().collect();
909 dims.sort();
910 for dim_id in dims {
911 writer.write_all(&dim_bytes[&dim_id])?;
912 }
913 }
914
915 let output_size = writer.offset() as usize;
916 writer.finish()?;
917
918 log::info!(
919 "Sparse vector merge complete: {} fields, {} bytes",
920 field_tables.len(),
921 output_size
922 );
923
924 Ok(output_size)
925 }
926}
927
928async fn write_vector_file<D: Directory + DirectoryWriter>(
932 dir: &D,
933 files: &SegmentFiles,
934 mut field_indexes: Vec<(u32, u8, Vec<u8>)>,
935) -> Result<usize> {
936 use byteorder::{LittleEndian, WriteBytesExt};
937
938 if field_indexes.is_empty() {
939 return Ok(0);
940 }
941
942 field_indexes.sort_by_key(|(id, _, _)| *id);
943
944 let mut writer = OffsetWriter::new(dir.streaming_writer(&files.vectors).await?);
945
946 let per_field_entry = size_of::<u32>() + size_of::<u8>() + size_of::<u64>() + size_of::<u64>();
948 let header_size = size_of::<u32>() + field_indexes.len() * per_field_entry;
949 writer.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
950
951 let mut current_offset = header_size as u64;
952 for (field_id, index_type, data) in &field_indexes {
953 writer.write_u32::<LittleEndian>(*field_id)?;
954 writer.write_u8(*index_type)?;
955 writer.write_u64::<LittleEndian>(current_offset)?;
956 writer.write_u64::<LittleEndian>(data.len() as u64)?;
957 current_offset += data.len() as u64;
958 }
959
960 for (_, _, data) in field_indexes {
962 writer.write_all(&data)?;
963 }
964
965 let output_size = writer.offset() as usize;
966 writer.finish()?;
967 Ok(output_size)
968}
969
970pub async fn delete_segment<D: Directory + DirectoryWriter>(
972 dir: &D,
973 segment_id: SegmentId,
974) -> Result<()> {
975 let files = SegmentFiles::new(segment_id.0);
976 let _ = dir.delete(&files.term_dict).await;
977 let _ = dir.delete(&files.postings).await;
978 let _ = dir.delete(&files.store).await;
979 let _ = dir.delete(&files.meta).await;
980 let _ = dir.delete(&files.vectors).await;
981 Ok(())
982}