Skip to main content

hermes_core/segment/merger/
mod.rs

1//! Segment merger for combining multiple segments
2
3mod dense;
4#[cfg(feature = "diagnostics")]
5mod diagnostics;
6mod fast_fields;
7mod postings;
8mod sparse;
9mod store;
10
11use std::sync::Arc;
12
13use rustc_hash::FxHashMap;
14
15use super::reader::SegmentReader;
16use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
17use super::{OffsetWriter, format_bytes};
18use crate::Result;
19use crate::directories::{Directory, DirectoryWriter};
20use crate::dsl::Schema;
21
22/// Compute per-segment doc ID offsets (each segment's docs start after the previous).
23///
24/// Returns an error if the total document count across segments exceeds `u32::MAX`.
25fn doc_offsets(segments: &[SegmentReader]) -> Result<Vec<u32>> {
26    let mut offsets = Vec::with_capacity(segments.len());
27    let mut acc = 0u32;
28    for seg in segments {
29        offsets.push(acc);
30        acc = acc.checked_add(seg.num_docs()).ok_or_else(|| {
31            crate::Error::Internal(format!(
32                "Total document count across segments exceeds u32::MAX ({})",
33                u32::MAX
34            ))
35        })?;
36    }
37    Ok(offsets)
38}
39
40/// Statistics for merge operations
41#[derive(Debug, Clone, Default)]
42pub struct MergeStats {
43    /// Number of terms processed
44    pub terms_processed: usize,
45    /// Term dictionary output size
46    pub term_dict_bytes: usize,
47    /// Postings output size
48    pub postings_bytes: usize,
49    /// Store output size
50    pub store_bytes: usize,
51    /// Vector index output size
52    pub vectors_bytes: usize,
53    /// Sparse vector index output size
54    pub sparse_bytes: usize,
55    /// Fast-field output size
56    pub fast_bytes: usize,
57}
58
59impl std::fmt::Display for MergeStats {
60    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61        write!(
62            f,
63            "terms={}, term_dict={}, postings={}, store={}, vectors={}, sparse={}, fast={}",
64            self.terms_processed,
65            format_bytes(self.term_dict_bytes),
66            format_bytes(self.postings_bytes),
67            format_bytes(self.store_bytes),
68            format_bytes(self.vectors_bytes),
69            format_bytes(self.sparse_bytes),
70            format_bytes(self.fast_bytes),
71        )
72    }
73}
74
75// TrainedVectorStructures is defined in super::types (available on all platforms)
76pub use super::types::TrainedVectorStructures;
77
78/// Segment merger - merges multiple segments into one
79pub struct SegmentMerger {
80    schema: Arc<Schema>,
81}
82
83impl SegmentMerger {
84    pub fn new(schema: Arc<Schema>) -> Self {
85        Self { schema }
86    }
87
88    /// Merge segments into one, streaming postings/positions/store directly to files.
89    ///
90    /// If `trained` is provided, dense vectors use O(1) cluster merge when possible
91    /// (homogeneous IVF/ScaNN), otherwise rebuilds ANN from trained structures.
92    /// Without trained structures, only flat vectors are merged.
93    ///
94    /// Uses streaming writers so postings, positions, and store data flow directly
95    /// to files instead of buffering everything in memory. Only the term dictionary
96    /// (compact key+TermInfo entries) is buffered.
97    pub async fn merge<D: Directory + DirectoryWriter>(
98        &self,
99        dir: &D,
100        segments: &[SegmentReader],
101        new_segment_id: SegmentId,
102        trained: Option<&TrainedVectorStructures>,
103    ) -> Result<(SegmentMeta, MergeStats)> {
104        let mut stats = MergeStats::default();
105        let files = SegmentFiles::new(new_segment_id.0);
106
107        // === All 4 phases concurrent: postings || store || dense || sparse ===
108        // Each phase reads from independent parts of source segments (SSTable,
109        // store blocks, flat vectors, sparse index) and writes to independent
110        // output files. No phase consumes another's output.
111        // tokio::try_join! interleaves I/O across the four futures on the same task.
112        let merge_start = std::time::Instant::now();
113
114        let postings_fut = async {
115            let mut postings_writer =
116                OffsetWriter::new(dir.streaming_writer(&files.postings).await?);
117            let mut positions_writer =
118                OffsetWriter::new(dir.streaming_writer(&files.positions).await?);
119            let mut term_dict_writer =
120                OffsetWriter::new(dir.streaming_writer(&files.term_dict).await?);
121
122            let terms_processed = self
123                .merge_postings(
124                    segments,
125                    &mut term_dict_writer,
126                    &mut postings_writer,
127                    &mut positions_writer,
128                )
129                .await?;
130
131            let postings_bytes = postings_writer.offset() as usize;
132            let term_dict_bytes = term_dict_writer.offset() as usize;
133            let positions_bytes = positions_writer.offset();
134
135            postings_writer.finish()?;
136            term_dict_writer.finish()?;
137            if positions_bytes > 0 {
138                positions_writer.finish()?;
139            } else {
140                drop(positions_writer);
141                let _ = dir.delete(&files.positions).await;
142            }
143            log::info!(
144                "[merge] postings done: {} terms, term_dict={}, postings={}, positions={}",
145                terms_processed,
146                format_bytes(term_dict_bytes),
147                format_bytes(postings_bytes),
148                format_bytes(positions_bytes as usize),
149            );
150            Ok::<(usize, usize, usize), crate::Error>((
151                terms_processed,
152                term_dict_bytes,
153                postings_bytes,
154            ))
155        };
156
157        let store_fut = async {
158            let mut store_writer = OffsetWriter::new(dir.streaming_writer(&files.store).await?);
159            let store_num_docs = self.merge_store(segments, &mut store_writer).await?;
160            let bytes = store_writer.offset() as usize;
161            store_writer.finish()?;
162            Ok::<(usize, u32), crate::Error>((bytes, store_num_docs))
163        };
164
165        let dense_fut = async {
166            self.merge_dense_vectors(dir, segments, &files, trained)
167                .await
168        };
169
170        let sparse_fut = async { self.merge_sparse_vectors(dir, segments, &files).await };
171
172        let fast_fut = async { self.merge_fast_fields(dir, segments, &files).await };
173
174        let (postings_result, store_result, vectors_bytes, (sparse_bytes, fast_bytes)) =
175            tokio::try_join!(postings_fut, store_fut, dense_fut, async {
176                tokio::try_join!(sparse_fut, fast_fut)
177            })?;
178        let (store_bytes, store_num_docs) = store_result;
179        stats.terms_processed = postings_result.0;
180        stats.term_dict_bytes = postings_result.1;
181        stats.postings_bytes = postings_result.2;
182        stats.store_bytes = store_bytes;
183        stats.vectors_bytes = vectors_bytes;
184        stats.sparse_bytes = sparse_bytes;
185        stats.fast_bytes = fast_bytes;
186        log::info!(
187            "[merge] all phases done in {:.1}s: {}",
188            merge_start.elapsed().as_secs_f64(),
189            stats
190        );
191
192        // === Mandatory: merge field stats + write meta ===
193        let mut merged_field_stats: FxHashMap<u32, FieldStats> = FxHashMap::default();
194        for segment in segments {
195            for (&field_id, field_stats) in &segment.meta().field_stats {
196                let entry = merged_field_stats.entry(field_id).or_default();
197                entry.total_tokens += field_stats.total_tokens;
198                entry.doc_count += field_stats.doc_count;
199            }
200        }
201
202        let total_docs: u32 = segments
203            .iter()
204            .try_fold(0u32, |acc, s| acc.checked_add(s.num_docs()))
205            .ok_or_else(|| {
206                crate::Error::Internal(format!(
207                    "Total document count exceeds u32::MAX ({})",
208                    u32::MAX
209                ))
210            })?;
211
212        // Verify store doc count matches metadata — a mismatch here means
213        // some store blocks were lost (e.g., compression thread panic) or
214        // source segment metadata disagrees with its store.
215        if store_num_docs != total_docs {
216            log::error!(
217                "[merge] STORE/META MISMATCH: store has {} docs but metadata expects {}. \
218                 Per-segment: {:?}",
219                store_num_docs,
220                total_docs,
221                segments
222                    .iter()
223                    .map(|s| (
224                        format!("{:016x}", s.meta().id),
225                        s.num_docs(),
226                        s.store().num_docs()
227                    ))
228                    .collect::<Vec<_>>()
229            );
230            return Err(crate::Error::Io(std::io::Error::new(
231                std::io::ErrorKind::InvalidData,
232                format!(
233                    "Store/meta doc count mismatch: store={}, meta={}",
234                    store_num_docs, total_docs
235                ),
236            )));
237        }
238
239        let meta = SegmentMeta {
240            id: new_segment_id.0,
241            num_docs: total_docs,
242            field_stats: merged_field_stats,
243        };
244
245        dir.write(&files.meta, &meta.serialize()?).await?;
246
247        let label = if trained.is_some() {
248            "ANN merge"
249        } else {
250            "Merge"
251        };
252        log::info!("{} complete: {} docs, {}", label, total_docs, stats);
253
254        Ok((meta, stats))
255    }
256}
257
258/// Delete segment files from directory (all deletions run concurrently).
259pub async fn delete_segment<D: Directory + DirectoryWriter>(
260    dir: &D,
261    segment_id: SegmentId,
262) -> Result<()> {
263    let files = SegmentFiles::new(segment_id.0);
264    let _ = tokio::join!(
265        dir.delete(&files.term_dict),
266        dir.delete(&files.postings),
267        dir.delete(&files.store),
268        dir.delete(&files.meta),
269        dir.delete(&files.vectors),
270        dir.delete(&files.sparse),
271        dir.delete(&files.positions),
272        dir.delete(&files.fast),
273    );
274    Ok(())
275}