Skip to main content

hermes_core/segment/merger/
mod.rs

1//! Segment merger for combining multiple segments
2
3mod dense;
4#[cfg(feature = "diagnostics")]
5mod diagnostics;
6mod fast_fields;
7mod postings;
8mod sparse;
9mod store;
10
11use std::sync::Arc;
12
13use rustc_hash::FxHashMap;
14
15use super::reader::SegmentReader;
16use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
17use super::{OffsetWriter, format_bytes};
18use crate::Result;
19use crate::directories::{Directory, DirectoryWriter};
20use crate::dsl::Schema;
21
22/// Compute per-segment doc ID offsets (each segment's docs start after the previous).
23///
24/// Returns an error if the total document count across segments exceeds `u32::MAX`.
25fn doc_offsets(segments: &[SegmentReader]) -> Result<Vec<u32>> {
26    let mut offsets = Vec::with_capacity(segments.len());
27    let mut acc = 0u32;
28    for seg in segments {
29        offsets.push(acc);
30        acc = acc.checked_add(seg.num_docs()).ok_or_else(|| {
31            crate::Error::Internal(format!(
32                "Total document count across segments exceeds u32::MAX ({})",
33                u32::MAX
34            ))
35        })?;
36    }
37    Ok(offsets)
38}
39
40/// Statistics for merge operations
41#[derive(Debug, Clone, Default)]
42pub struct MergeStats {
43    /// Number of terms processed
44    pub terms_processed: usize,
45    /// Term dictionary output size
46    pub term_dict_bytes: usize,
47    /// Postings output size
48    pub postings_bytes: usize,
49    /// Store output size
50    pub store_bytes: usize,
51    /// Vector index output size
52    pub vectors_bytes: usize,
53    /// Sparse vector index output size
54    pub sparse_bytes: usize,
55    /// Fast-field output size
56    pub fast_bytes: usize,
57}
58
59impl std::fmt::Display for MergeStats {
60    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61        write!(
62            f,
63            "terms={}, term_dict={}, postings={}, store={}, vectors={}, sparse={}, fast={}",
64            self.terms_processed,
65            format_bytes(self.term_dict_bytes),
66            format_bytes(self.postings_bytes),
67            format_bytes(self.store_bytes),
68            format_bytes(self.vectors_bytes),
69            format_bytes(self.sparse_bytes),
70            format_bytes(self.fast_bytes),
71        )
72    }
73}
74
75// TrainedVectorStructures is defined in super::types (available on all platforms)
76pub use super::types::TrainedVectorStructures;
77
78/// Segment merger - merges multiple segments into one
79pub struct SegmentMerger {
80    schema: Arc<Schema>,
81    /// When true, force record-level BP reorder on all BMP fields.
82    force_reorder: bool,
83}
84
85impl SegmentMerger {
86    pub fn new(schema: Arc<Schema>) -> Self {
87        Self {
88            schema,
89            force_reorder: false,
90        }
91    }
92
93    /// Enable forced BP reorder on all BMP fields during merge.
94    pub fn with_force_reorder(mut self, force: bool) -> Self {
95        self.force_reorder = force;
96        self
97    }
98
99    /// Merge segments into one, streaming postings/positions/store directly to files.
100    ///
101    /// If `trained` is provided, dense vectors use O(1) cluster merge when possible
102    /// (homogeneous IVF/ScaNN), otherwise rebuilds ANN from trained structures.
103    /// Without trained structures, only flat vectors are merged.
104    ///
105    /// Uses streaming writers so postings, positions, and store data flow directly
106    /// to files instead of buffering everything in memory. Only the term dictionary
107    /// (compact key+TermInfo entries) is buffered.
108    pub async fn merge<D: Directory + DirectoryWriter>(
109        &self,
110        dir: &D,
111        segments: &[SegmentReader],
112        new_segment_id: SegmentId,
113        trained: Option<&TrainedVectorStructures>,
114    ) -> Result<(SegmentMeta, MergeStats)> {
115        let mut stats = MergeStats::default();
116        let files = SegmentFiles::new(new_segment_id.0);
117
118        // === Two-stage merge to bound page cache pressure ===
119        //
120        // Stage 1: postings + store + fast_fields (concurrent)
121        //   Touches .term_dict, .postings, .positions, .store, .fast files.
122        //
123        // Stage 2: sparse + dense vectors (concurrent)
124        //   Touches .sparse, .vectors files.
125        //
126        // Running all phases concurrently caused OOM on large merges because
127        // mmap'd source files from all 16+ segments compete for page cache
128        // simultaneously (200+ GB of mmap'd data for BMP grids alone).
129        // Two stages halve the concurrent working set.
130        let merge_start = std::time::Instant::now();
131
132        // ── Stage 1: text + store + fast fields ─────────────────────────
133        let postings_fut = async {
134            let mut postings_writer =
135                OffsetWriter::new(dir.streaming_writer(&files.postings).await?);
136            let mut positions_writer =
137                OffsetWriter::new(dir.streaming_writer(&files.positions).await?);
138            let mut term_dict_writer =
139                OffsetWriter::new(dir.streaming_writer(&files.term_dict).await?);
140
141            let terms_processed = self
142                .merge_postings(
143                    segments,
144                    &mut term_dict_writer,
145                    &mut postings_writer,
146                    &mut positions_writer,
147                )
148                .await?;
149
150            let postings_bytes = postings_writer.offset() as usize;
151            let term_dict_bytes = term_dict_writer.offset() as usize;
152            let positions_bytes = positions_writer.offset();
153
154            postings_writer.finish()?;
155            term_dict_writer.finish()?;
156            if positions_bytes > 0 {
157                positions_writer.finish()?;
158            } else {
159                drop(positions_writer);
160                let _ = dir.delete(&files.positions).await;
161            }
162            log::info!(
163                "[merge] postings done: {} terms, term_dict={}, postings={}, positions={}",
164                terms_processed,
165                format_bytes(term_dict_bytes),
166                format_bytes(postings_bytes),
167                format_bytes(positions_bytes as usize),
168            );
169            Ok::<(usize, usize, usize), crate::Error>((
170                terms_processed,
171                term_dict_bytes,
172                postings_bytes,
173            ))
174        };
175
176        let store_fut = async {
177            let mut store_writer = OffsetWriter::new(dir.streaming_writer(&files.store).await?);
178            let store_num_docs = self.merge_store(segments, &mut store_writer).await?;
179            let bytes = store_writer.offset() as usize;
180            store_writer.finish()?;
181            Ok::<(usize, u32), crate::Error>((bytes, store_num_docs))
182        };
183
184        let fast_fut = async { self.merge_fast_fields(dir, segments, &files).await };
185
186        let (postings_result, store_result, fast_bytes) =
187            tokio::try_join!(postings_fut, store_fut, fast_fut)?;
188
189        log::info!(
190            "[merge] stage 1 done in {:.1}s (postings + store + fast)",
191            merge_start.elapsed().as_secs_f64()
192        );
193
194        // ── Stage 2: sparse + dense vectors ─────────────────────────────
195        // Page cache from stage 1 files can now be evicted by the kernel
196        // as stage 2 accesses different mmap regions (.sparse, .vectors).
197        let sparse_fut = async { self.merge_sparse_vectors(dir, segments, &files).await };
198
199        let dense_fut = async {
200            self.merge_dense_vectors(dir, segments, &files, trained)
201                .await
202        };
203
204        let (sparse_bytes, vectors_bytes) = tokio::try_join!(sparse_fut, dense_fut)?;
205        let (store_bytes, store_num_docs) = store_result;
206        stats.terms_processed = postings_result.0;
207        stats.term_dict_bytes = postings_result.1;
208        stats.postings_bytes = postings_result.2;
209        stats.store_bytes = store_bytes;
210        stats.vectors_bytes = vectors_bytes;
211        stats.sparse_bytes = sparse_bytes;
212        stats.fast_bytes = fast_bytes;
213        log::info!(
214            "[merge] all phases done in {:.1}s: {}",
215            merge_start.elapsed().as_secs_f64(),
216            stats
217        );
218
219        // === Mandatory: merge field stats + write meta ===
220        let mut merged_field_stats: FxHashMap<u32, FieldStats> = FxHashMap::default();
221        for segment in segments {
222            for (&field_id, field_stats) in &segment.meta().field_stats {
223                let entry = merged_field_stats.entry(field_id).or_default();
224                entry.total_tokens += field_stats.total_tokens;
225                entry.doc_count += field_stats.doc_count;
226            }
227        }
228
229        let total_docs: u32 = segments
230            .iter()
231            .try_fold(0u32, |acc, s| acc.checked_add(s.num_docs()))
232            .ok_or_else(|| {
233                crate::Error::Internal(format!(
234                    "Total document count exceeds u32::MAX ({})",
235                    u32::MAX
236                ))
237            })?;
238
239        // Verify store doc count matches metadata — a mismatch here means
240        // some store blocks were lost (e.g., compression thread panic) or
241        // source segment metadata disagrees with its store.
242        if store_num_docs != total_docs {
243            log::error!(
244                "[merge] STORE/META MISMATCH: store has {} docs but metadata expects {}. \
245                 Per-segment: {:?}",
246                store_num_docs,
247                total_docs,
248                segments
249                    .iter()
250                    .map(|s| (
251                        format!("{:016x}", s.meta().id),
252                        s.num_docs(),
253                        s.store().num_docs()
254                    ))
255                    .collect::<Vec<_>>()
256            );
257            return Err(crate::Error::Io(std::io::Error::new(
258                std::io::ErrorKind::InvalidData,
259                format!(
260                    "Store/meta doc count mismatch: store={}, meta={}",
261                    store_num_docs, total_docs
262                ),
263            )));
264        }
265
266        let meta = SegmentMeta {
267            id: new_segment_id.0,
268            num_docs: total_docs,
269            field_stats: merged_field_stats,
270        };
271
272        dir.write(&files.meta, &meta.serialize()?).await?;
273
274        let label = if trained.is_some() {
275            "ANN merge"
276        } else {
277            "Merge"
278        };
279        log::info!("{} complete: {} docs, {}", label, total_docs, stats);
280
281        Ok((meta, stats))
282    }
283}
284
285/// Delete segment files from directory (all deletions run concurrently).
286pub async fn delete_segment<D: Directory + DirectoryWriter>(
287    dir: &D,
288    segment_id: SegmentId,
289) -> Result<()> {
290    let files = SegmentFiles::new(segment_id.0);
291    let _ = tokio::join!(
292        dir.delete(&files.term_dict),
293        dir.delete(&files.postings),
294        dir.delete(&files.store),
295        dir.delete(&files.meta),
296        dir.delete(&files.vectors),
297        dir.delete(&files.sparse),
298        dir.delete(&files.positions),
299        dir.delete(&files.fast),
300    );
301    Ok(())
302}