Skip to main content

hermes_core/segment/
merger.rs

1//! Segment merger for combining multiple segments
2
3use std::cmp::Ordering;
4use std::collections::BinaryHeap;
5use std::io::Write;
6use std::mem::size_of;
7use std::sync::Arc;
8
9use rustc_hash::FxHashMap;
10
11use super::reader::SegmentReader;
12use super::store::StoreMerger;
13use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
14use crate::Result;
15use crate::directories::{Directory, DirectoryWriter, StreamingWriter};
16use crate::dsl::{FieldType, Schema};
17use crate::structures::{
18    BlockPostingList, PositionPostingList, PostingList, RaBitQConfig, RaBitQIndex, SSTableWriter,
19    TERMINATED, TermInfo,
20};
21
22/// Write adapter that tracks bytes written.
23///
24/// Concrete type so it works with generic `serialize<W: Write>` functions
25/// (unlike `dyn StreamingWriter` which isn't `Sized`).
26pub(crate) struct OffsetWriter {
27    inner: Box<dyn StreamingWriter>,
28    offset: u64,
29}
30
31impl OffsetWriter {
32    fn new(inner: Box<dyn StreamingWriter>) -> Self {
33        Self { inner, offset: 0 }
34    }
35
36    /// Current write position (total bytes written so far).
37    fn offset(&self) -> u64 {
38        self.offset
39    }
40
41    /// Finalize the underlying streaming writer.
42    fn finish(self) -> std::io::Result<()> {
43        self.inner.finish()
44    }
45}
46
47impl Write for OffsetWriter {
48    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
49        let n = self.inner.write(buf)?;
50        self.offset += n as u64;
51        Ok(n)
52    }
53
54    fn flush(&mut self) -> std::io::Result<()> {
55        self.inner.flush()
56    }
57}
58
59/// Format byte count as human-readable string
60fn format_bytes(bytes: usize) -> String {
61    if bytes >= 1024 * 1024 * 1024 {
62        format!("{:.2} GB", bytes as f64 / (1024.0 * 1024.0 * 1024.0))
63    } else if bytes >= 1024 * 1024 {
64        format!("{:.2} MB", bytes as f64 / (1024.0 * 1024.0))
65    } else if bytes >= 1024 {
66        format!("{:.2} KB", bytes as f64 / 1024.0)
67    } else {
68        format!("{} B", bytes)
69    }
70}
71
72/// Compute per-segment doc ID offsets (each segment's docs start after the previous)
73fn doc_offsets(segments: &[SegmentReader]) -> Vec<u32> {
74    let mut offsets = Vec::with_capacity(segments.len());
75    let mut acc = 0u32;
76    for seg in segments {
77        offsets.push(acc);
78        acc += seg.num_docs();
79    }
80    offsets
81}
82
83/// Statistics for merge operations
84#[derive(Debug, Clone, Default)]
85pub struct MergeStats {
86    /// Number of terms processed
87    pub terms_processed: usize,
88    /// Peak memory usage in bytes (estimated)
89    pub peak_memory_bytes: usize,
90    /// Current memory usage in bytes (estimated)
91    pub current_memory_bytes: usize,
92    /// Term dictionary output size
93    pub term_dict_bytes: usize,
94    /// Postings output size
95    pub postings_bytes: usize,
96    /// Store output size
97    pub store_bytes: usize,
98    /// Vector index output size
99    pub vectors_bytes: usize,
100    /// Sparse vector index output size
101    pub sparse_bytes: usize,
102}
103
104/// Entry for k-way merge heap
105struct MergeEntry {
106    key: Vec<u8>,
107    term_info: TermInfo,
108    segment_idx: usize,
109    doc_offset: u32,
110}
111
112impl PartialEq for MergeEntry {
113    fn eq(&self, other: &Self) -> bool {
114        self.key == other.key
115    }
116}
117
118impl Eq for MergeEntry {}
119
120impl PartialOrd for MergeEntry {
121    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
122        Some(self.cmp(other))
123    }
124}
125
126impl Ord for MergeEntry {
127    fn cmp(&self, other: &Self) -> Ordering {
128        // Reverse order for min-heap (BinaryHeap is max-heap by default)
129        other.key.cmp(&self.key)
130    }
131}
132
133/// Trained vector index structures for rebuilding segments with ANN indexes
134pub struct TrainedVectorStructures {
135    /// Trained centroids per field_id
136    pub centroids: rustc_hash::FxHashMap<u32, Arc<crate::structures::CoarseCentroids>>,
137    /// Trained PQ codebooks per field_id (for ScaNN)
138    pub codebooks: rustc_hash::FxHashMap<u32, Arc<crate::structures::PQCodebook>>,
139}
140
141/// Strategy for handling dense vector indexes during merge
142pub enum DenseVectorStrategy<'a> {
143    /// Merge existing indexes (ScaNN/IVF cluster merge or RaBitQ rebuild)
144    MergeExisting,
145    /// Build ANN indexes from flat vectors using trained structures
146    BuildAnn(&'a TrainedVectorStructures),
147}
148
149/// Segment merger - merges multiple segments into one
150pub struct SegmentMerger {
151    schema: Arc<Schema>,
152}
153
154impl SegmentMerger {
155    pub fn new(schema: Arc<Schema>) -> Self {
156        Self { schema }
157    }
158
159    /// Merge segments into one, streaming postings/positions/store directly to files.
160    pub async fn merge<D: Directory + DirectoryWriter>(
161        &self,
162        dir: &D,
163        segments: &[SegmentReader],
164        new_segment_id: SegmentId,
165    ) -> Result<(SegmentMeta, MergeStats)> {
166        self.merge_core(
167            dir,
168            segments,
169            new_segment_id,
170            DenseVectorStrategy::MergeExisting,
171        )
172        .await
173    }
174
175    /// Core merge: handles all mandatory parts (postings, positions, store, sparse, field stats, meta)
176    /// and delegates dense vector handling to the provided strategy.
177    ///
178    /// Uses streaming writers so postings, positions, and store data flow directly
179    /// to files instead of buffering everything in memory. Only the term dictionary
180    /// (compact key+TermInfo entries) is buffered.
181    async fn merge_core<D: Directory + DirectoryWriter>(
182        &self,
183        dir: &D,
184        segments: &[SegmentReader],
185        new_segment_id: SegmentId,
186        dense_strategy: DenseVectorStrategy<'_>,
187    ) -> Result<(SegmentMeta, MergeStats)> {
188        let mut stats = MergeStats::default();
189        let files = SegmentFiles::new(new_segment_id.0);
190
191        // === Phase 1: merge postings + positions (streaming) ===
192        let mut postings_writer = OffsetWriter::new(dir.streaming_writer(&files.postings).await?);
193        let mut positions_writer = OffsetWriter::new(dir.streaming_writer(&files.positions).await?);
194        let mut term_dict_writer = OffsetWriter::new(dir.streaming_writer(&files.term_dict).await?);
195
196        let terms_processed = self
197            .merge_postings(
198                segments,
199                &mut term_dict_writer,
200                &mut postings_writer,
201                &mut positions_writer,
202                &mut stats,
203            )
204            .await?;
205        stats.terms_processed = terms_processed;
206        stats.postings_bytes = postings_writer.offset() as usize;
207        stats.term_dict_bytes = term_dict_writer.offset() as usize;
208        let positions_bytes = positions_writer.offset();
209
210        postings_writer.finish()?;
211        term_dict_writer.finish()?;
212        if positions_bytes > 0 {
213            positions_writer.finish()?;
214        } else {
215            drop(positions_writer);
216            let _ = dir.delete(&files.positions).await;
217        }
218
219        // === Phase 2: merge store files (streaming) ===
220        {
221            let mut store_writer = OffsetWriter::new(dir.streaming_writer(&files.store).await?);
222            {
223                let mut store_merger = StoreMerger::new(&mut store_writer);
224                for segment in segments {
225                    if segment.store_has_dict() {
226                        store_merger
227                            .append_store_recompressing(segment.store())
228                            .await
229                            .map_err(crate::Error::Io)?;
230                    } else {
231                        let raw_blocks = segment.store_raw_blocks();
232                        let data_slice = segment.store_data_slice();
233                        store_merger.append_store(data_slice, &raw_blocks).await?;
234                    }
235                }
236                store_merger.finish()?;
237            }
238            stats.store_bytes = store_writer.offset() as usize;
239            store_writer.finish()?;
240        }
241
242        // === Dense vectors: strategy-dependent ===
243        let vectors_bytes = match &dense_strategy {
244            DenseVectorStrategy::MergeExisting => {
245                self.merge_dense_vectors(dir, segments, &files).await?
246            }
247            DenseVectorStrategy::BuildAnn(trained) => {
248                self.build_ann_vectors(dir, segments, &files, trained)
249                    .await?
250            }
251        };
252        stats.vectors_bytes = vectors_bytes;
253
254        // === Mandatory: merge sparse vectors ===
255        let sparse_bytes = self.merge_sparse_vectors(dir, segments, &files).await?;
256        stats.sparse_bytes = sparse_bytes;
257
258        // === Mandatory: merge field stats + write meta ===
259        let mut merged_field_stats: FxHashMap<u32, FieldStats> = FxHashMap::default();
260        for segment in segments {
261            for (&field_id, field_stats) in &segment.meta().field_stats {
262                let entry = merged_field_stats.entry(field_id).or_default();
263                entry.total_tokens += field_stats.total_tokens;
264                entry.doc_count += field_stats.doc_count;
265            }
266        }
267
268        let total_docs: u32 = segments.iter().map(|s| s.num_docs()).sum();
269        let meta = SegmentMeta {
270            id: new_segment_id.0,
271            num_docs: total_docs,
272            field_stats: merged_field_stats,
273        };
274
275        dir.write(&files.meta, &meta.serialize()?).await?;
276
277        let label = match &dense_strategy {
278            DenseVectorStrategy::MergeExisting => "Merge",
279            DenseVectorStrategy::BuildAnn(_) => "ANN merge",
280        };
281        log::info!(
282            "{} complete: {} docs, {} terms, term_dict={}, postings={}, store={}, vectors={}, sparse={}",
283            label,
284            total_docs,
285            stats.terms_processed,
286            format_bytes(stats.term_dict_bytes),
287            format_bytes(stats.postings_bytes),
288            format_bytes(stats.store_bytes),
289            format_bytes(stats.vectors_bytes),
290            format_bytes(stats.sparse_bytes),
291        );
292
293        Ok((meta, stats))
294    }
295
296    /// Merge postings from multiple segments using streaming k-way merge
297    ///
298    /// This implementation uses a min-heap to merge terms from all segments
299    /// in sorted order without loading all terms into memory at once.
300    /// Memory usage is O(num_segments) instead of O(total_terms).
301    ///
302    /// Optimization: For terms that exist in only one segment, we copy the
303    /// posting data directly without decode/encode. Only terms that exist
304    /// in multiple segments need full merge.
305    ///
306    /// Returns the number of terms processed.
307    async fn merge_postings(
308        &self,
309        segments: &[SegmentReader],
310        term_dict: &mut OffsetWriter,
311        postings_out: &mut OffsetWriter,
312        positions_out: &mut OffsetWriter,
313        stats: &mut MergeStats,
314    ) -> Result<usize> {
315        let doc_offs = doc_offsets(segments);
316
317        // Bulk-prefetch all term dict blocks (1 I/O per segment instead of ~160)
318        for (i, segment) in segments.iter().enumerate() {
319            log::debug!("Prefetching term dict for segment {} ...", i);
320            segment.prefetch_term_dict().await?;
321        }
322
323        // Create iterators for each segment's term dictionary
324        let mut iterators: Vec<_> = segments.iter().map(|s| s.term_dict_iter()).collect();
325
326        // Initialize min-heap with first entry from each segment
327        let mut heap: BinaryHeap<MergeEntry> = BinaryHeap::new();
328        for (seg_idx, iter) in iterators.iter_mut().enumerate() {
329            if let Some((key, term_info)) = iter.next().await.map_err(crate::Error::from)? {
330                heap.push(MergeEntry {
331                    key,
332                    term_info,
333                    segment_idx: seg_idx,
334                    doc_offset: doc_offs[seg_idx],
335                });
336            }
337        }
338
339        // Buffer term results - needed because SSTableWriter can't be held across await points
340        // Memory is bounded by unique terms (typically much smaller than postings)
341        let mut term_results: Vec<(Vec<u8>, TermInfo)> = Vec::new();
342        let mut terms_processed = 0usize;
343
344        while !heap.is_empty() {
345            // Get the smallest key
346            let first = heap.pop().unwrap();
347            let current_key = first.key.clone();
348
349            // Collect all entries with the same key
350            let mut sources: Vec<(usize, TermInfo, u32)> =
351                vec![(first.segment_idx, first.term_info, first.doc_offset)];
352
353            // Advance the iterator that provided this entry
354            if let Some((key, term_info)) = iterators[first.segment_idx]
355                .next()
356                .await
357                .map_err(crate::Error::from)?
358            {
359                heap.push(MergeEntry {
360                    key,
361                    term_info,
362                    segment_idx: first.segment_idx,
363                    doc_offset: doc_offs[first.segment_idx],
364                });
365            }
366
367            // Check if other segments have the same key
368            while let Some(entry) = heap.peek() {
369                if entry.key != current_key {
370                    break;
371                }
372                let entry = heap.pop().unwrap();
373                sources.push((entry.segment_idx, entry.term_info, entry.doc_offset));
374
375                // Advance this iterator too
376                if let Some((key, term_info)) = iterators[entry.segment_idx]
377                    .next()
378                    .await
379                    .map_err(crate::Error::from)?
380                {
381                    heap.push(MergeEntry {
382                        key,
383                        term_info,
384                        segment_idx: entry.segment_idx,
385                        doc_offset: doc_offs[entry.segment_idx],
386                    });
387                }
388            }
389
390            // Process this term (handles both single-source and multi-source)
391            let term_info = self
392                .merge_term(segments, &sources, postings_out, positions_out)
393                .await?;
394
395            term_results.push((current_key, term_info));
396            terms_processed += 1;
397
398            // Log progress every 100k terms
399            if terms_processed.is_multiple_of(100_000) {
400                log::debug!("Merge progress: {} terms processed", terms_processed);
401            }
402        }
403
404        // Track memory (only term_results is buffered; postings/positions stream to disk)
405        let results_mem = term_results.capacity() * std::mem::size_of::<(Vec<u8>, TermInfo)>();
406        stats.current_memory_bytes = results_mem;
407        stats.peak_memory_bytes = stats.peak_memory_bytes.max(stats.current_memory_bytes);
408
409        log::info!(
410            "[merge] complete: terms={}, segments={}, term_buffer={:.2} MB, postings={}, positions={}",
411            terms_processed,
412            segments.len(),
413            results_mem as f64 / (1024.0 * 1024.0),
414            format_bytes(postings_out.offset() as usize),
415            format_bytes(positions_out.offset() as usize),
416        );
417
418        // Write to SSTable (sync, no await points)
419        let mut writer = SSTableWriter::<TermInfo>::new(term_dict);
420        for (key, term_info) in term_results {
421            writer.insert(&key, &term_info)?;
422        }
423        writer.finish()?;
424
425        Ok(terms_processed)
426    }
427
428    /// Merge a single term's postings + positions from one or more source segments.
429    ///
430    /// Fast path: when all sources are external and there are multiple,
431    /// uses block-level concatenation (O(blocks) instead of O(postings)).
432    /// Otherwise: full decode → remap doc IDs → re-encode.
433    async fn merge_term(
434        &self,
435        segments: &[SegmentReader],
436        sources: &[(usize, TermInfo, u32)],
437        postings_out: &mut OffsetWriter,
438        positions_out: &mut OffsetWriter,
439    ) -> Result<TermInfo> {
440        let mut sorted: Vec<_> = sources.to_vec();
441        sorted.sort_by_key(|(_, _, off)| *off);
442
443        let any_positions = sorted.iter().any(|(_, ti, _)| ti.position_info().is_some());
444        let all_external = sorted.iter().all(|(_, ti, _)| ti.external_info().is_some());
445
446        // === Merge postings ===
447        let (posting_offset, posting_len, doc_count) = if all_external && sorted.len() > 1 {
448            // Fast path: block-level concatenation
449            let mut block_sources = Vec::with_capacity(sorted.len());
450            for (seg_idx, ti, doc_off) in &sorted {
451                let (off, len) = ti.external_info().unwrap();
452                let bytes = segments[*seg_idx].read_postings(off, len).await?;
453                let bpl = BlockPostingList::deserialize(&mut bytes.as_slice())?;
454                block_sources.push((bpl, *doc_off));
455            }
456            let merged = BlockPostingList::concatenate_blocks(&block_sources)?;
457            let offset = postings_out.offset();
458            let mut buf = Vec::new();
459            merged.serialize(&mut buf)?;
460            postings_out.write_all(&buf)?;
461            (offset, buf.len() as u32, merged.doc_count())
462        } else {
463            // Decode all sources into a flat PostingList, remap doc IDs
464            let mut merged = PostingList::new();
465            for (seg_idx, ti, doc_off) in &sorted {
466                if let Some((ids, tfs)) = ti.decode_inline() {
467                    for (id, tf) in ids.into_iter().zip(tfs) {
468                        merged.add(id + doc_off, tf);
469                    }
470                } else {
471                    let (off, len) = ti.external_info().unwrap();
472                    let bytes = segments[*seg_idx].read_postings(off, len).await?;
473                    let bpl = BlockPostingList::deserialize(&mut bytes.as_slice())?;
474                    let mut it = bpl.iterator();
475                    while it.doc() != TERMINATED {
476                        merged.add(it.doc() + doc_off, it.term_freq());
477                        it.advance();
478                    }
479                }
480            }
481            // Try to inline (only when no positions)
482            if !any_positions {
483                let ids: Vec<u32> = merged.iter().map(|p| p.doc_id).collect();
484                let tfs: Vec<u32> = merged.iter().map(|p| p.term_freq).collect();
485                if let Some(inline) = TermInfo::try_inline(&ids, &tfs) {
486                    return Ok(inline);
487                }
488            }
489            let offset = postings_out.offset();
490            let block = BlockPostingList::from_posting_list(&merged)?;
491            let mut buf = Vec::new();
492            block.serialize(&mut buf)?;
493            postings_out.write_all(&buf)?;
494            (offset, buf.len() as u32, merged.doc_count())
495        };
496
497        // === Merge positions (if any source has them) ===
498        if any_positions {
499            let mut pos_sources = Vec::new();
500            for (seg_idx, ti, doc_off) in &sorted {
501                if let Some((pos_off, pos_len)) = ti.position_info()
502                    && let Some(bytes) = segments[*seg_idx]
503                        .read_position_bytes(pos_off, pos_len)
504                        .await?
505                {
506                    let pl = PositionPostingList::deserialize(&mut bytes.as_slice())
507                        .map_err(crate::Error::Io)?;
508                    pos_sources.push((pl, *doc_off));
509                }
510            }
511            if !pos_sources.is_empty() {
512                let merged = PositionPostingList::concatenate_blocks(&pos_sources)
513                    .map_err(crate::Error::Io)?;
514                let offset = positions_out.offset();
515                let mut buf = Vec::new();
516                merged.serialize(&mut buf).map_err(crate::Error::Io)?;
517                positions_out.write_all(&buf)?;
518                return Ok(TermInfo::external_with_positions(
519                    posting_offset,
520                    posting_len,
521                    doc_count,
522                    offset,
523                    buf.len() as u32,
524                ));
525            }
526        }
527
528        Ok(TermInfo::external(posting_offset, posting_len, doc_count))
529    }
530
531    /// Merge dense vector indexes - returns output size in bytes
532    ///
533    /// For ScaNN (IVF-PQ): O(1) merge by concatenating cluster data (same codebook)
534    /// For IVF-RaBitQ: O(1) merge by concatenating cluster data (same centroids)
535    /// For RaBitQ: Must rebuild with new centroid from all vectors
536    async fn merge_dense_vectors<D: Directory + DirectoryWriter>(
537        &self,
538        dir: &D,
539        segments: &[SegmentReader],
540        files: &SegmentFiles,
541    ) -> Result<usize> {
542        let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
543
544        for (field, entry) in self.schema.fields() {
545            if !matches!(entry.field_type, FieldType::DenseVector) {
546                continue;
547            }
548
549            // Check if all segments have ScaNN indexes for this field
550            let scann_indexes: Vec<_> = segments
551                .iter()
552                .filter_map(|s| s.get_scann_vector_index(field))
553                .collect();
554
555            if scann_indexes.len()
556                == segments
557                    .iter()
558                    .filter(|s| s.has_dense_vector_index(field))
559                    .count()
560                && !scann_indexes.is_empty()
561            {
562                // All segments have ScaNN - use O(1) cluster merge!
563                let refs: Vec<&crate::structures::IVFPQIndex> =
564                    scann_indexes.iter().map(|(idx, _)| idx.as_ref()).collect();
565
566                let doc_offs = doc_offsets(segments);
567
568                match crate::structures::IVFPQIndex::merge(&refs, &doc_offs) {
569                    Ok(merged) => {
570                        let bytes = merged
571                            .to_bytes()
572                            .map_err(|e| crate::Error::Serialization(e.to_string()))?;
573                        field_indexes.push((field.0, 2u8, bytes)); // 2 = ScaNN
574                        continue;
575                    }
576                    Err(e) => {
577                        log::warn!("ScaNN merge failed: {}, falling back to IVF", e);
578                    }
579                }
580            }
581
582            // Check if all segments have IVF indexes for this field
583            let ivf_indexes: Vec<_> = segments
584                .iter()
585                .filter_map(|s| s.get_ivf_vector_index(field))
586                .collect();
587
588            if ivf_indexes.len()
589                == segments
590                    .iter()
591                    .filter(|s| s.has_dense_vector_index(field))
592                    .count()
593                && !ivf_indexes.is_empty()
594            {
595                // All segments have IVF - use O(1) cluster merge!
596                let refs: Vec<&crate::structures::IVFRaBitQIndex> =
597                    ivf_indexes.iter().map(|arc| arc.as_ref()).collect();
598
599                let doc_offs = doc_offsets(segments);
600
601                match crate::structures::IVFRaBitQIndex::merge(&refs, &doc_offs) {
602                    Ok(merged) => {
603                        let bytes = merged
604                            .to_bytes()
605                            .map_err(|e| crate::Error::Serialization(e.to_string()))?;
606                        field_indexes.push((field.0, 1u8, bytes)); // 1 = IVF-RaBitQ
607                        continue;
608                    }
609                    Err(e) => {
610                        log::warn!("IVF merge failed: {}, falling back to rebuild", e);
611                    }
612                }
613            }
614
615            // Fall back to RaBitQ rebuild (collect raw vectors)
616            let mut all_vectors: Vec<Vec<f32>> = Vec::new();
617
618            for segment in segments {
619                if let Some(index) = segment.get_dense_vector_index(field)
620                    && let Some(raw_vecs) = &index.raw_vectors
621                {
622                    all_vectors.extend(raw_vecs.iter().cloned());
623                }
624            }
625
626            if !all_vectors.is_empty() {
627                let dim = all_vectors[0].len();
628                let config = RaBitQConfig::new(dim);
629                let merged_index = RaBitQIndex::build(config, &all_vectors, true);
630
631                let index_bytes = serde_json::to_vec(&merged_index)
632                    .map_err(|e| crate::Error::Serialization(e.to_string()))?;
633
634                field_indexes.push((field.0, 0u8, index_bytes)); // 0 = RaBitQ
635            }
636        }
637
638        write_vector_file(dir, files, field_indexes).await
639    }
640
641    /// Merge sparse vector indexes using block stacking
642    ///
643    /// This is O(blocks) instead of O(postings) - we stack blocks directly
644    /// and only adjust the first_doc_id in each block header by the doc offset.
645    /// Deltas within blocks remain unchanged since they're relative.
646    async fn merge_sparse_vectors<D: Directory + DirectoryWriter>(
647        &self,
648        dir: &D,
649        segments: &[SegmentReader],
650        files: &SegmentFiles,
651    ) -> Result<usize> {
652        use crate::structures::BlockSparsePostingList;
653        use byteorder::{LittleEndian, WriteBytesExt};
654
655        let doc_offs = doc_offsets(segments);
656        for (i, seg) in segments.iter().enumerate() {
657            log::debug!(
658                "Sparse merge: segment {} has {} docs, doc_offset={}",
659                i,
660                seg.num_docs(),
661                doc_offs[i]
662            );
663        }
664
665        // Collect all sparse vector fields from schema
666        let sparse_fields: Vec<_> = self
667            .schema
668            .fields()
669            .filter(|(_, entry)| matches!(entry.field_type, FieldType::SparseVector))
670            .map(|(field, entry)| (field, entry.sparse_vector_config.clone()))
671            .collect();
672
673        if sparse_fields.is_empty() {
674            return Ok(0);
675        }
676
677        // Collect field data: (field_id, quantization, max_dim_id, dim_id -> merged_posting_list)
678        type SparseFieldData = (
679            u32,
680            crate::structures::WeightQuantization,
681            u32,
682            FxHashMap<u32, Vec<u8>>,
683        );
684        let mut field_data: Vec<SparseFieldData> = Vec::new();
685
686        for (field, sparse_config) in &sparse_fields {
687            // Get quantization from config
688            let quantization = sparse_config
689                .as_ref()
690                .map(|c| c.weight_quantization)
691                .unwrap_or(crate::structures::WeightQuantization::Float32);
692
693            // Collect all dimensions across all segments for this field
694            let mut all_dims: rustc_hash::FxHashSet<u32> = rustc_hash::FxHashSet::default();
695            for segment in segments {
696                if let Some(sparse_index) = segment.sparse_indexes().get(&field.0) {
697                    for dim_id in sparse_index.active_dimensions() {
698                        all_dims.insert(dim_id);
699                    }
700                }
701            }
702
703            if all_dims.is_empty() {
704                continue;
705            }
706
707            // Bulk-read ALL posting lists per segment in one I/O call each.
708            // This replaces 80K+ individual get_posting() calls with ~4 bulk reads.
709            let mut segment_postings: Vec<FxHashMap<u32, Arc<BlockSparsePostingList>>> =
710                Vec::with_capacity(segments.len());
711            for (seg_idx, segment) in segments.iter().enumerate() {
712                if let Some(sparse_index) = segment.sparse_indexes().get(&field.0) {
713                    log::debug!(
714                        "Sparse merge field {}: bulk-reading {} dims from segment {}",
715                        field.0,
716                        sparse_index.num_dimensions(),
717                        seg_idx
718                    );
719                    let postings = sparse_index.read_all_postings_bulk().await?;
720                    segment_postings.push(postings);
721                } else {
722                    segment_postings.push(FxHashMap::default());
723                }
724            }
725
726            let mut dim_bytes: FxHashMap<u32, Vec<u8>> = FxHashMap::default();
727
728            // Merge from in-memory data — no I/O in this loop
729            for dim_id in all_dims {
730                let mut posting_arcs: Vec<(Arc<BlockSparsePostingList>, u32)> = Vec::new();
731
732                for (seg_idx, postings) in segment_postings.iter().enumerate() {
733                    if let Some(posting_list) = postings.get(&dim_id) {
734                        posting_arcs.push((Arc::clone(posting_list), doc_offs[seg_idx]));
735                    }
736                }
737
738                if posting_arcs.is_empty() {
739                    continue;
740                }
741
742                let lists_with_offsets: Vec<(&BlockSparsePostingList, u32)> = posting_arcs
743                    .iter()
744                    .map(|(pl, offset)| (pl.as_ref(), *offset))
745                    .collect();
746
747                let merged = BlockSparsePostingList::merge_with_offsets(&lists_with_offsets);
748
749                let mut bytes = Vec::new();
750                merged.serialize(&mut bytes).map_err(crate::Error::Io)?;
751                dim_bytes.insert(dim_id, bytes);
752            }
753
754            // Drop bulk data before accumulating output
755            drop(segment_postings);
756
757            // Store num_dims (active count) instead of max_dim_id
758            field_data.push((field.0, quantization, dim_bytes.len() as u32, dim_bytes));
759        }
760
761        if field_data.is_empty() {
762            return Ok(0);
763        }
764
765        // Sort by field_id
766        field_data.sort_by_key(|(id, _, _, _)| *id);
767
768        // Compute header size and per-dimension offsets before writing
769        // num_fields(u32) + per-field: field_id(u32) + quant(u8) + num_dims(u32)
770        // per-dim: dim_id(u32) + offset(u64) + length(u32)
771        let per_dim_entry = size_of::<u32>() + size_of::<u64>() + size_of::<u32>();
772        let per_field_header = size_of::<u32>() + size_of::<u8>() + size_of::<u32>();
773        let mut header_size = size_of::<u32>() as u64;
774        for (_, _, num_dims, _) in &field_data {
775            header_size += per_field_header as u64;
776            header_size += (*num_dims as u64) * per_dim_entry as u64;
777        }
778
779        // Pre-compute offset tables (small — just dim_id + offset + length per dim)
780        let mut current_offset = header_size;
781        let mut field_tables: Vec<Vec<(u32, u64, u32)>> = Vec::new();
782        for (_, _, _, dim_bytes) in &field_data {
783            let mut table: Vec<(u32, u64, u32)> = Vec::with_capacity(dim_bytes.len());
784            let mut dims: Vec<_> = dim_bytes.keys().copied().collect();
785            dims.sort();
786            for dim_id in dims {
787                let bytes = &dim_bytes[&dim_id];
788                table.push((dim_id, current_offset, bytes.len() as u32));
789                current_offset += bytes.len() as u64;
790            }
791            field_tables.push(table);
792        }
793
794        // Stream header + tables + data directly to disk
795        let mut writer = OffsetWriter::new(dir.streaming_writer(&files.sparse).await?);
796
797        writer.write_u32::<LittleEndian>(field_data.len() as u32)?;
798        for (i, (field_id, quantization, num_dims, _)) in field_data.iter().enumerate() {
799            writer.write_u32::<LittleEndian>(*field_id)?;
800            writer.write_u8(*quantization as u8)?;
801            writer.write_u32::<LittleEndian>(*num_dims)?;
802            for &(dim_id, offset, length) in &field_tables[i] {
803                writer.write_u32::<LittleEndian>(dim_id)?;
804                writer.write_u64::<LittleEndian>(offset)?;
805                writer.write_u32::<LittleEndian>(length)?;
806            }
807        }
808
809        // Stream posting data per-field, per-dimension (drop each after writing)
810        for (_, _, _, dim_bytes) in field_data {
811            let mut dims: Vec<_> = dim_bytes.keys().copied().collect();
812            dims.sort();
813            for dim_id in dims {
814                writer.write_all(&dim_bytes[&dim_id])?;
815            }
816        }
817
818        let output_size = writer.offset() as usize;
819        writer.finish()?;
820
821        log::info!(
822            "Sparse vector merge complete: {} fields, {} bytes",
823            field_tables.len(),
824            output_size
825        );
826
827        Ok(output_size)
828    }
829
830    /// Merge segments and rebuild dense vectors with ANN indexes using trained structures.
831    pub async fn merge_with_ann<D: Directory + DirectoryWriter>(
832        &self,
833        dir: &D,
834        segments: &[SegmentReader],
835        new_segment_id: SegmentId,
836        trained: &TrainedVectorStructures,
837    ) -> Result<(SegmentMeta, MergeStats)> {
838        self.merge_core(
839            dir,
840            segments,
841            new_segment_id,
842            DenseVectorStrategy::BuildAnn(trained),
843        )
844        .await
845    }
846
847    /// Build ANN indexes from Flat vectors using trained centroids
848    async fn build_ann_vectors<D: Directory + DirectoryWriter>(
849        &self,
850        dir: &D,
851        segments: &[SegmentReader],
852        files: &SegmentFiles,
853        trained: &TrainedVectorStructures,
854    ) -> Result<usize> {
855        use crate::dsl::VectorIndexType;
856
857        let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
858
859        for (field, entry) in self.schema.fields() {
860            if !matches!(entry.field_type, FieldType::DenseVector) || !entry.indexed {
861                continue;
862            }
863
864            let config = match &entry.dense_vector_config {
865                Some(c) => c,
866                None => continue,
867            };
868
869            // Collect all Flat vectors from segments
870            let mut all_vectors: Vec<Vec<f32>> = Vec::new();
871            let mut all_doc_ids: Vec<(u32, u16)> = Vec::new();
872            let mut doc_offset = 0u32;
873
874            for segment in segments {
875                if let Some(super::VectorIndex::Flat(flat_data)) =
876                    segment.vector_indexes().get(&field.0)
877                {
878                    for (vec, (local_doc_id, ordinal)) in
879                        flat_data.vectors.iter().zip(flat_data.doc_ids.iter())
880                    {
881                        all_vectors.push(vec.clone());
882                        all_doc_ids.push((doc_offset + local_doc_id, *ordinal));
883                    }
884                }
885                doc_offset += segment.num_docs();
886            }
887
888            if all_vectors.is_empty() {
889                continue;
890            }
891
892            let dim = config.index_dim();
893
894            // Extract just doc_ids for ANN indexes (they don't track ordinals yet)
895            let ann_doc_ids: Vec<u32> = all_doc_ids.iter().map(|(doc_id, _)| *doc_id).collect();
896
897            // Build ANN index based on index type and available trained structures
898            match config.index_type {
899                VectorIndexType::IvfRaBitQ => {
900                    if let Some(centroids) = trained.centroids.get(&field.0) {
901                        // Create RaBitQ codebook for the dimension
902                        let rabitq_config = crate::structures::RaBitQConfig::new(dim);
903                        let codebook = crate::structures::RaBitQCodebook::new(rabitq_config);
904
905                        // Build IVF-RaBitQ index
906                        let ivf_config = crate::structures::IVFRaBitQConfig::new(dim)
907                            .with_store_raw(config.store_raw);
908                        let ivf_index = crate::structures::IVFRaBitQIndex::build(
909                            ivf_config,
910                            centroids,
911                            &codebook,
912                            &all_vectors,
913                            Some(&ann_doc_ids),
914                        );
915
916                        let index_data = super::builder::IVFRaBitQIndexData {
917                            centroids: (**centroids).clone(),
918                            codebook,
919                            index: ivf_index,
920                        };
921                        let bytes = index_data
922                            .to_bytes()
923                            .map_err(|e| crate::Error::Serialization(e.to_string()))?;
924                        field_indexes.push((field.0, 1u8, bytes)); // 1 = IVF-RaBitQ
925
926                        log::info!(
927                            "Built IVF-RaBitQ index for field {} with {} vectors",
928                            field.0,
929                            all_vectors.len()
930                        );
931                        continue;
932                    }
933                }
934                VectorIndexType::ScaNN => {
935                    if let (Some(centroids), Some(codebook)) = (
936                        trained.centroids.get(&field.0),
937                        trained.codebooks.get(&field.0),
938                    ) {
939                        // Build ScaNN (IVF-PQ) index
940                        let ivf_pq_config = crate::structures::IVFPQConfig::new(dim);
941                        let ivf_pq_index = crate::structures::IVFPQIndex::build(
942                            ivf_pq_config,
943                            centroids,
944                            codebook,
945                            &all_vectors,
946                            Some(&ann_doc_ids),
947                        );
948
949                        let index_data = super::builder::ScaNNIndexData {
950                            centroids: (**centroids).clone(),
951                            codebook: (**codebook).clone(),
952                            index: ivf_pq_index,
953                        };
954                        let bytes = index_data
955                            .to_bytes()
956                            .map_err(|e| crate::Error::Serialization(e.to_string()))?;
957                        field_indexes.push((field.0, 2u8, bytes)); // 2 = ScaNN
958
959                        log::info!(
960                            "Built ScaNN index for field {} with {} vectors",
961                            field.0,
962                            all_vectors.len()
963                        );
964                        continue;
965                    }
966                }
967                _ => {}
968            }
969
970            // Fallback: keep as Flat if no trained structures available
971            let flat_data = super::builder::FlatVectorData {
972                dim,
973                vectors: all_vectors,
974                doc_ids: all_doc_ids,
975            };
976            let bytes = flat_data.to_binary_bytes();
977            field_indexes.push((field.0, 4u8, bytes)); // 4 = Flat Binary
978        }
979
980        write_vector_file(dir, files, field_indexes).await
981    }
982}
983
984/// Write a vector index file with per-field header + data.
985/// Streams header then each field's data directly to disk, avoiding a single
986/// giant concatenation buffer.
987async fn write_vector_file<D: Directory + DirectoryWriter>(
988    dir: &D,
989    files: &SegmentFiles,
990    mut field_indexes: Vec<(u32, u8, Vec<u8>)>,
991) -> Result<usize> {
992    use byteorder::{LittleEndian, WriteBytesExt};
993
994    if field_indexes.is_empty() {
995        return Ok(0);
996    }
997
998    field_indexes.sort_by_key(|(id, _, _)| *id);
999
1000    let mut writer = OffsetWriter::new(dir.streaming_writer(&files.vectors).await?);
1001
1002    // num_fields(u32) + per-field: field_id(u32) + index_type(u8) + offset(u64) + length(u64)
1003    let per_field_entry = size_of::<u32>() + size_of::<u8>() + size_of::<u64>() + size_of::<u64>();
1004    let header_size = size_of::<u32>() + field_indexes.len() * per_field_entry;
1005    writer.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
1006
1007    let mut current_offset = header_size as u64;
1008    for (field_id, index_type, data) in &field_indexes {
1009        writer.write_u32::<LittleEndian>(*field_id)?;
1010        writer.write_u8(*index_type)?;
1011        writer.write_u64::<LittleEndian>(current_offset)?;
1012        writer.write_u64::<LittleEndian>(data.len() as u64)?;
1013        current_offset += data.len() as u64;
1014    }
1015
1016    // Stream each field's data (drop after writing)
1017    for (_, _, data) in field_indexes {
1018        writer.write_all(&data)?;
1019    }
1020
1021    let output_size = writer.offset() as usize;
1022    writer.finish()?;
1023    Ok(output_size)
1024}
1025
1026/// Delete segment files from directory
1027pub async fn delete_segment<D: Directory + DirectoryWriter>(
1028    dir: &D,
1029    segment_id: SegmentId,
1030) -> Result<()> {
1031    let files = SegmentFiles::new(segment_id.0);
1032    let _ = dir.delete(&files.term_dict).await;
1033    let _ = dir.delete(&files.postings).await;
1034    let _ = dir.delete(&files.store).await;
1035    let _ = dir.delete(&files.meta).await;
1036    let _ = dir.delete(&files.vectors).await;
1037    Ok(())
1038}