Skip to main content

hermes_core/segment/merger/
mod.rs

1//! Segment merger for combining multiple segments
2
3mod dense;
4#[cfg(feature = "diagnostics")]
5mod diagnostics;
6mod fast_fields;
7mod postings;
8mod sparse;
9mod store;
10
11use std::sync::Arc;
12
13use rustc_hash::FxHashMap;
14
15use super::reader::SegmentReader;
16use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
17use super::{OffsetWriter, format_bytes};
18use crate::Result;
19use crate::directories::{Directory, DirectoryWriter};
20use crate::dsl::Schema;
21
22/// Compute per-segment doc ID offsets (each segment's docs start after the previous).
23///
24/// Returns an error if the total document count across segments exceeds `u32::MAX`.
25fn doc_offsets(segments: &[SegmentReader]) -> Result<Vec<u32>> {
26    let mut offsets = Vec::with_capacity(segments.len());
27    let mut acc = 0u32;
28    for seg in segments {
29        offsets.push(acc);
30        acc = acc.checked_add(seg.num_docs()).ok_or_else(|| {
31            crate::Error::Internal(format!(
32                "Total document count across segments exceeds u32::MAX ({})",
33                u32::MAX
34            ))
35        })?;
36    }
37    Ok(offsets)
38}
39
40/// Statistics for merge operations
41#[derive(Debug, Clone, Default)]
42pub struct MergeStats {
43    /// Number of terms processed
44    pub terms_processed: usize,
45    /// Term dictionary output size
46    pub term_dict_bytes: usize,
47    /// Postings output size
48    pub postings_bytes: usize,
49    /// Store output size
50    pub store_bytes: usize,
51    /// Vector index output size
52    pub vectors_bytes: usize,
53    /// Sparse vector index output size
54    pub sparse_bytes: usize,
55    /// Fast-field output size
56    pub fast_bytes: usize,
57}
58
59impl std::fmt::Display for MergeStats {
60    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61        write!(
62            f,
63            "terms={}, term_dict={}, postings={}, store={}, vectors={}, sparse={}, fast={}",
64            self.terms_processed,
65            format_bytes(self.term_dict_bytes),
66            format_bytes(self.postings_bytes),
67            format_bytes(self.store_bytes),
68            format_bytes(self.vectors_bytes),
69            format_bytes(self.sparse_bytes),
70            format_bytes(self.fast_bytes),
71        )
72    }
73}
74
75// TrainedVectorStructures is defined in super::types (available on all platforms)
76pub use super::types::TrainedVectorStructures;
77
78/// Segment merger - merges multiple segments into one
79pub struct SegmentMerger {
80    schema: Arc<Schema>,
81}
82
83impl SegmentMerger {
84    pub fn new(schema: Arc<Schema>) -> Self {
85        Self { schema }
86    }
87
88    /// Merge segments into one, streaming postings/positions/store directly to files.
89    ///
90    /// If `trained` is provided, dense vectors use O(1) cluster merge when possible
91    /// (homogeneous IVF/ScaNN), otherwise rebuilds ANN from trained structures.
92    /// Without trained structures, only flat vectors are merged.
93    ///
94    /// Uses streaming writers so postings, positions, and store data flow directly
95    /// to files instead of buffering everything in memory. Only the term dictionary
96    /// (compact key+TermInfo entries) is buffered.
97    pub async fn merge<D: Directory + DirectoryWriter>(
98        &self,
99        dir: &D,
100        segments: &[SegmentReader],
101        new_segment_id: SegmentId,
102        trained: Option<&TrainedVectorStructures>,
103    ) -> Result<(SegmentMeta, MergeStats)> {
104        let mut stats = MergeStats::default();
105        let files = SegmentFiles::new(new_segment_id.0);
106
107        // === Two-stage merge to bound page cache pressure ===
108        //
109        // Stage 1: postings + store + fast_fields (concurrent)
110        //   Touches .term_dict, .postings, .positions, .store, .fast files.
111        //
112        // Stage 2: sparse + dense vectors (concurrent)
113        //   Touches .sparse, .vectors files.
114        //
115        // Running all phases concurrently caused OOM on large merges because
116        // mmap'd source files from all 16+ segments compete for page cache
117        // simultaneously (200+ GB of mmap'd data for BMP grids alone).
118        // Two stages halve the concurrent working set.
119        let merge_start = std::time::Instant::now();
120
121        // ── Stage 1: text + store + fast fields ─────────────────────────
122        let postings_fut = async {
123            let mut postings_writer =
124                OffsetWriter::new(dir.streaming_writer(&files.postings).await?);
125            let mut positions_writer =
126                OffsetWriter::new(dir.streaming_writer(&files.positions).await?);
127            let mut term_dict_writer =
128                OffsetWriter::new(dir.streaming_writer(&files.term_dict).await?);
129
130            let terms_processed = self
131                .merge_postings(
132                    segments,
133                    &mut term_dict_writer,
134                    &mut postings_writer,
135                    &mut positions_writer,
136                )
137                .await?;
138
139            let postings_bytes = postings_writer.offset() as usize;
140            let term_dict_bytes = term_dict_writer.offset() as usize;
141            let positions_bytes = positions_writer.offset();
142
143            postings_writer.finish()?;
144            term_dict_writer.finish()?;
145            if positions_bytes > 0 {
146                positions_writer.finish()?;
147            } else {
148                drop(positions_writer);
149                let _ = dir.delete(&files.positions).await;
150            }
151            log::info!(
152                "[merge] postings done: {} terms, term_dict={}, postings={}, positions={}",
153                terms_processed,
154                format_bytes(term_dict_bytes),
155                format_bytes(postings_bytes),
156                format_bytes(positions_bytes as usize),
157            );
158            Ok::<(usize, usize, usize), crate::Error>((
159                terms_processed,
160                term_dict_bytes,
161                postings_bytes,
162            ))
163        };
164
165        let store_fut = async {
166            let mut store_writer = OffsetWriter::new(dir.streaming_writer(&files.store).await?);
167            let store_num_docs = self.merge_store(segments, &mut store_writer).await?;
168            let bytes = store_writer.offset() as usize;
169            store_writer.finish()?;
170            Ok::<(usize, u32), crate::Error>((bytes, store_num_docs))
171        };
172
173        let fast_fut = async { self.merge_fast_fields(dir, segments, &files).await };
174
175        let (postings_result, store_result, fast_bytes) =
176            tokio::try_join!(postings_fut, store_fut, fast_fut)?;
177
178        log::info!(
179            "[merge] stage 1 done in {:.1}s (postings + store + fast)",
180            merge_start.elapsed().as_secs_f64()
181        );
182
183        // ── Stage 2: sparse + dense vectors ─────────────────────────────
184        // Page cache from stage 1 files can now be evicted by the kernel
185        // as stage 2 accesses different mmap regions (.sparse, .vectors).
186        let sparse_fut = async { self.merge_sparse_vectors(dir, segments, &files).await };
187
188        let dense_fut = async {
189            self.merge_dense_vectors(dir, segments, &files, trained)
190                .await
191        };
192
193        let (sparse_bytes, vectors_bytes) = tokio::try_join!(sparse_fut, dense_fut)?;
194        let (store_bytes, store_num_docs) = store_result;
195        stats.terms_processed = postings_result.0;
196        stats.term_dict_bytes = postings_result.1;
197        stats.postings_bytes = postings_result.2;
198        stats.store_bytes = store_bytes;
199        stats.vectors_bytes = vectors_bytes;
200        stats.sparse_bytes = sparse_bytes;
201        stats.fast_bytes = fast_bytes;
202        log::info!(
203            "[merge] all phases done in {:.1}s: {}",
204            merge_start.elapsed().as_secs_f64(),
205            stats
206        );
207
208        // === Mandatory: merge field stats + write meta ===
209        let mut merged_field_stats: FxHashMap<u32, FieldStats> = FxHashMap::default();
210        for segment in segments {
211            for (&field_id, field_stats) in &segment.meta().field_stats {
212                let entry = merged_field_stats.entry(field_id).or_default();
213                entry.total_tokens += field_stats.total_tokens;
214                entry.doc_count += field_stats.doc_count;
215            }
216        }
217
218        let total_docs: u32 = segments
219            .iter()
220            .try_fold(0u32, |acc, s| acc.checked_add(s.num_docs()))
221            .ok_or_else(|| {
222                crate::Error::Internal(format!(
223                    "Total document count exceeds u32::MAX ({})",
224                    u32::MAX
225                ))
226            })?;
227
228        // Verify store doc count matches metadata — a mismatch here means
229        // some store blocks were lost (e.g., compression thread panic) or
230        // source segment metadata disagrees with its store.
231        if store_num_docs != total_docs {
232            log::error!(
233                "[merge] STORE/META MISMATCH: store has {} docs but metadata expects {}. \
234                 Per-segment: {:?}",
235                store_num_docs,
236                total_docs,
237                segments
238                    .iter()
239                    .map(|s| (
240                        format!("{:016x}", s.meta().id),
241                        s.num_docs(),
242                        s.store().num_docs()
243                    ))
244                    .collect::<Vec<_>>()
245            );
246            return Err(crate::Error::Io(std::io::Error::new(
247                std::io::ErrorKind::InvalidData,
248                format!(
249                    "Store/meta doc count mismatch: store={}, meta={}",
250                    store_num_docs, total_docs
251                ),
252            )));
253        }
254
255        let meta = SegmentMeta {
256            id: new_segment_id.0,
257            num_docs: total_docs,
258            field_stats: merged_field_stats,
259        };
260
261        dir.write(&files.meta, &meta.serialize()?).await?;
262
263        let label = if trained.is_some() {
264            "ANN merge"
265        } else {
266            "Merge"
267        };
268        log::info!("{} complete: {} docs, {}", label, total_docs, stats);
269
270        Ok((meta, stats))
271    }
272}
273
274/// Delete segment files from directory (all deletions run concurrently).
275pub async fn delete_segment<D: Directory + DirectoryWriter>(
276    dir: &D,
277    segment_id: SegmentId,
278) -> Result<()> {
279    let files = SegmentFiles::new(segment_id.0);
280    let _ = tokio::join!(
281        dir.delete(&files.term_dict),
282        dir.delete(&files.postings),
283        dir.delete(&files.store),
284        dir.delete(&files.meta),
285        dir.delete(&files.vectors),
286        dir.delete(&files.sparse),
287        dir.delete(&files.positions),
288        dir.delete(&files.fast),
289    );
290    Ok(())
291}