Skip to main content

hermes_core/segment/merger/
mod.rs

1//! Segment merger for combining multiple segments
2
3mod dense;
4#[cfg(feature = "diagnostics")]
5mod diagnostics;
6mod fast_fields;
7mod postings;
8mod sparse;
9mod store;
10
11use std::sync::Arc;
12
13use rustc_hash::FxHashMap;
14
15use super::reader::SegmentReader;
16use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
17use super::{OffsetWriter, format_bytes};
18use crate::Result;
19use crate::directories::{Directory, DirectoryWriter};
20use crate::dsl::Schema;
21
22/// Compute per-segment doc ID offsets (each segment's docs start after the previous)
23fn doc_offsets(segments: &[SegmentReader]) -> Vec<u32> {
24    let mut offsets = Vec::with_capacity(segments.len());
25    let mut acc = 0u32;
26    for seg in segments {
27        offsets.push(acc);
28        acc += seg.num_docs();
29    }
30    offsets
31}
32
33/// Statistics for merge operations
34#[derive(Debug, Clone, Default)]
35pub struct MergeStats {
36    /// Number of terms processed
37    pub terms_processed: usize,
38    /// Term dictionary output size
39    pub term_dict_bytes: usize,
40    /// Postings output size
41    pub postings_bytes: usize,
42    /// Store output size
43    pub store_bytes: usize,
44    /// Vector index output size
45    pub vectors_bytes: usize,
46    /// Sparse vector index output size
47    pub sparse_bytes: usize,
48    /// Fast-field output size
49    pub fast_bytes: usize,
50}
51
52impl std::fmt::Display for MergeStats {
53    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54        write!(
55            f,
56            "terms={}, term_dict={}, postings={}, store={}, vectors={}, sparse={}, fast={}",
57            self.terms_processed,
58            format_bytes(self.term_dict_bytes),
59            format_bytes(self.postings_bytes),
60            format_bytes(self.store_bytes),
61            format_bytes(self.vectors_bytes),
62            format_bytes(self.sparse_bytes),
63            format_bytes(self.fast_bytes),
64        )
65    }
66}
67
68// TrainedVectorStructures is defined in super::types (available on all platforms)
69pub use super::types::TrainedVectorStructures;
70
71/// Segment merger - merges multiple segments into one
72pub struct SegmentMerger {
73    schema: Arc<Schema>,
74}
75
76impl SegmentMerger {
77    pub fn new(schema: Arc<Schema>) -> Self {
78        Self { schema }
79    }
80
81    /// Merge segments into one, streaming postings/positions/store directly to files.
82    ///
83    /// If `trained` is provided, dense vectors use O(1) cluster merge when possible
84    /// (homogeneous IVF/ScaNN), otherwise rebuilds ANN from trained structures.
85    /// Without trained structures, only flat vectors are merged.
86    ///
87    /// Uses streaming writers so postings, positions, and store data flow directly
88    /// to files instead of buffering everything in memory. Only the term dictionary
89    /// (compact key+TermInfo entries) is buffered.
90    pub async fn merge<D: Directory + DirectoryWriter>(
91        &self,
92        dir: &D,
93        segments: &[SegmentReader],
94        new_segment_id: SegmentId,
95        trained: Option<&TrainedVectorStructures>,
96    ) -> Result<(SegmentMeta, MergeStats)> {
97        let mut stats = MergeStats::default();
98        let files = SegmentFiles::new(new_segment_id.0);
99
100        // === All 4 phases concurrent: postings || store || dense || sparse ===
101        // Each phase reads from independent parts of source segments (SSTable,
102        // store blocks, flat vectors, sparse index) and writes to independent
103        // output files. No phase consumes another's output.
104        // tokio::try_join! interleaves I/O across the four futures on the same task.
105        let merge_start = std::time::Instant::now();
106
107        let postings_fut = async {
108            let mut postings_writer =
109                OffsetWriter::new(dir.streaming_writer(&files.postings).await?);
110            let mut positions_writer =
111                OffsetWriter::new(dir.streaming_writer(&files.positions).await?);
112            let mut term_dict_writer =
113                OffsetWriter::new(dir.streaming_writer(&files.term_dict).await?);
114
115            let terms_processed = self
116                .merge_postings(
117                    segments,
118                    &mut term_dict_writer,
119                    &mut postings_writer,
120                    &mut positions_writer,
121                )
122                .await?;
123
124            let postings_bytes = postings_writer.offset() as usize;
125            let term_dict_bytes = term_dict_writer.offset() as usize;
126            let positions_bytes = positions_writer.offset();
127
128            postings_writer.finish()?;
129            term_dict_writer.finish()?;
130            if positions_bytes > 0 {
131                positions_writer.finish()?;
132            } else {
133                drop(positions_writer);
134                let _ = dir.delete(&files.positions).await;
135            }
136            log::info!(
137                "[merge] postings done: {} terms, term_dict={}, postings={}, positions={}",
138                terms_processed,
139                format_bytes(term_dict_bytes),
140                format_bytes(postings_bytes),
141                format_bytes(positions_bytes as usize),
142            );
143            Ok::<(usize, usize, usize), crate::Error>((
144                terms_processed,
145                term_dict_bytes,
146                postings_bytes,
147            ))
148        };
149
150        let store_fut = async {
151            let mut store_writer = OffsetWriter::new(dir.streaming_writer(&files.store).await?);
152            let store_num_docs = self.merge_store(segments, &mut store_writer).await?;
153            let bytes = store_writer.offset() as usize;
154            store_writer.finish()?;
155            Ok::<(usize, u32), crate::Error>((bytes, store_num_docs))
156        };
157
158        let dense_fut = async {
159            self.merge_dense_vectors(dir, segments, &files, trained)
160                .await
161        };
162
163        let sparse_fut = async { self.merge_sparse_vectors(dir, segments, &files).await };
164
165        let fast_fut = async { self.merge_fast_fields(dir, segments, &files).await };
166
167        let (postings_result, store_result, vectors_bytes, (sparse_bytes, fast_bytes)) =
168            tokio::try_join!(postings_fut, store_fut, dense_fut, async {
169                tokio::try_join!(sparse_fut, fast_fut)
170            })?;
171        let (store_bytes, store_num_docs) = store_result;
172        stats.terms_processed = postings_result.0;
173        stats.term_dict_bytes = postings_result.1;
174        stats.postings_bytes = postings_result.2;
175        stats.store_bytes = store_bytes;
176        stats.vectors_bytes = vectors_bytes;
177        stats.sparse_bytes = sparse_bytes;
178        stats.fast_bytes = fast_bytes;
179        log::info!(
180            "[merge] all phases done in {:.1}s: {}",
181            merge_start.elapsed().as_secs_f64(),
182            stats
183        );
184
185        // === Mandatory: merge field stats + write meta ===
186        let mut merged_field_stats: FxHashMap<u32, FieldStats> = FxHashMap::default();
187        for segment in segments {
188            for (&field_id, field_stats) in &segment.meta().field_stats {
189                let entry = merged_field_stats.entry(field_id).or_default();
190                entry.total_tokens += field_stats.total_tokens;
191                entry.doc_count += field_stats.doc_count;
192            }
193        }
194
195        let total_docs: u32 = segments.iter().map(|s| s.num_docs()).sum();
196
197        // Verify store doc count matches metadata — a mismatch here means
198        // some store blocks were lost (e.g., compression thread panic) or
199        // source segment metadata disagrees with its store.
200        if store_num_docs != total_docs {
201            log::error!(
202                "[merge] STORE/META MISMATCH: store has {} docs but metadata expects {}. \
203                 Per-segment: {:?}",
204                store_num_docs,
205                total_docs,
206                segments
207                    .iter()
208                    .map(|s| (
209                        format!("{:016x}", s.meta().id),
210                        s.num_docs(),
211                        s.store().num_docs()
212                    ))
213                    .collect::<Vec<_>>()
214            );
215            return Err(crate::Error::Io(std::io::Error::new(
216                std::io::ErrorKind::InvalidData,
217                format!(
218                    "Store/meta doc count mismatch: store={}, meta={}",
219                    store_num_docs, total_docs
220                ),
221            )));
222        }
223
224        let meta = SegmentMeta {
225            id: new_segment_id.0,
226            num_docs: total_docs,
227            field_stats: merged_field_stats,
228        };
229
230        dir.write(&files.meta, &meta.serialize()?).await?;
231
232        let label = if trained.is_some() {
233            "ANN merge"
234        } else {
235            "Merge"
236        };
237        log::info!("{} complete: {} docs, {}", label, total_docs, stats);
238
239        Ok((meta, stats))
240    }
241}
242
243/// Delete segment files from directory (all deletions run concurrently).
244pub async fn delete_segment<D: Directory + DirectoryWriter>(
245    dir: &D,
246    segment_id: SegmentId,
247) -> Result<()> {
248    let files = SegmentFiles::new(segment_id.0);
249    let _ = tokio::join!(
250        dir.delete(&files.term_dict),
251        dir.delete(&files.postings),
252        dir.delete(&files.store),
253        dir.delete(&files.meta),
254        dir.delete(&files.vectors),
255        dir.delete(&files.sparse),
256        dir.delete(&files.positions),
257        dir.delete(&files.fast),
258    );
259    Ok(())
260}