Skip to main content

hermes_core/segment/merger/
mod.rs

1//! Segment merger for combining multiple segments
2
3mod dense;
4#[cfg(feature = "diagnostics")]
5mod diagnostics;
6mod fast_fields;
7mod postings;
8mod sparse;
9mod store;
10
11use std::io::Write;
12use std::sync::Arc;
13
14use rustc_hash::FxHashMap;
15
16use super::reader::SegmentReader;
17use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
18use crate::Result;
19use crate::directories::{Directory, DirectoryWriter, StreamingWriter};
20use crate::dsl::Schema;
21
22/// Write adapter that tracks bytes written.
23///
24/// Concrete type so it works with generic `serialize<W: Write>` functions
25/// (unlike `dyn StreamingWriter` which isn't `Sized`).
26pub(crate) struct OffsetWriter {
27    inner: Box<dyn StreamingWriter>,
28    offset: u64,
29}
30
31impl OffsetWriter {
32    fn new(inner: Box<dyn StreamingWriter>) -> Self {
33        Self { inner, offset: 0 }
34    }
35
36    /// Current write position (total bytes written so far).
37    fn offset(&self) -> u64 {
38        self.offset
39    }
40
41    /// Finalize the underlying streaming writer.
42    fn finish(self) -> std::io::Result<()> {
43        self.inner.finish()
44    }
45}
46
47impl Write for OffsetWriter {
48    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
49        let n = self.inner.write(buf)?;
50        self.offset += n as u64;
51        Ok(n)
52    }
53
54    fn flush(&mut self) -> std::io::Result<()> {
55        self.inner.flush()
56    }
57}
58
59/// Format byte count as human-readable string
60fn format_bytes(bytes: usize) -> String {
61    if bytes >= 1024 * 1024 * 1024 {
62        format!("{:.2} GB", bytes as f64 / (1024.0 * 1024.0 * 1024.0))
63    } else if bytes >= 1024 * 1024 {
64        format!("{:.2} MB", bytes as f64 / (1024.0 * 1024.0))
65    } else if bytes >= 1024 {
66        format!("{:.2} KB", bytes as f64 / 1024.0)
67    } else {
68        format!("{} B", bytes)
69    }
70}
71
72/// Compute per-segment doc ID offsets (each segment's docs start after the previous)
73fn doc_offsets(segments: &[SegmentReader]) -> Vec<u32> {
74    let mut offsets = Vec::with_capacity(segments.len());
75    let mut acc = 0u32;
76    for seg in segments {
77        offsets.push(acc);
78        acc += seg.num_docs();
79    }
80    offsets
81}
82
83/// Statistics for merge operations
84#[derive(Debug, Clone, Default)]
85pub struct MergeStats {
86    /// Number of terms processed
87    pub terms_processed: usize,
88    /// Term dictionary output size
89    pub term_dict_bytes: usize,
90    /// Postings output size
91    pub postings_bytes: usize,
92    /// Store output size
93    pub store_bytes: usize,
94    /// Vector index output size
95    pub vectors_bytes: usize,
96    /// Sparse vector index output size
97    pub sparse_bytes: usize,
98    /// Fast-field output size
99    pub fast_bytes: usize,
100}
101
102impl std::fmt::Display for MergeStats {
103    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104        write!(
105            f,
106            "terms={}, term_dict={}, postings={}, store={}, vectors={}, sparse={}, fast={}",
107            self.terms_processed,
108            format_bytes(self.term_dict_bytes),
109            format_bytes(self.postings_bytes),
110            format_bytes(self.store_bytes),
111            format_bytes(self.vectors_bytes),
112            format_bytes(self.sparse_bytes),
113            format_bytes(self.fast_bytes),
114        )
115    }
116}
117
118// TrainedVectorStructures is defined in super::types (available on all platforms)
119pub use super::types::TrainedVectorStructures;
120
121/// Segment merger - merges multiple segments into one
122pub struct SegmentMerger {
123    schema: Arc<Schema>,
124}
125
126impl SegmentMerger {
127    pub fn new(schema: Arc<Schema>) -> Self {
128        Self { schema }
129    }
130
131    /// Merge segments into one, streaming postings/positions/store directly to files.
132    ///
133    /// If `trained` is provided, dense vectors use O(1) cluster merge when possible
134    /// (homogeneous IVF/ScaNN), otherwise rebuilds ANN from trained structures.
135    /// Without trained structures, only flat vectors are merged.
136    ///
137    /// Uses streaming writers so postings, positions, and store data flow directly
138    /// to files instead of buffering everything in memory. Only the term dictionary
139    /// (compact key+TermInfo entries) is buffered.
140    pub async fn merge<D: Directory + DirectoryWriter>(
141        &self,
142        dir: &D,
143        segments: &[SegmentReader],
144        new_segment_id: SegmentId,
145        trained: Option<&TrainedVectorStructures>,
146    ) -> Result<(SegmentMeta, MergeStats)> {
147        let mut stats = MergeStats::default();
148        let files = SegmentFiles::new(new_segment_id.0);
149
150        // === All 4 phases concurrent: postings || store || dense || sparse ===
151        // Each phase reads from independent parts of source segments (SSTable,
152        // store blocks, flat vectors, sparse index) and writes to independent
153        // output files. No phase consumes another's output.
154        // tokio::try_join! interleaves I/O across the four futures on the same task.
155        let merge_start = std::time::Instant::now();
156
157        let postings_fut = async {
158            let mut postings_writer =
159                OffsetWriter::new(dir.streaming_writer(&files.postings).await?);
160            let mut positions_writer =
161                OffsetWriter::new(dir.streaming_writer(&files.positions).await?);
162            let mut term_dict_writer =
163                OffsetWriter::new(dir.streaming_writer(&files.term_dict).await?);
164
165            let terms_processed = self
166                .merge_postings(
167                    segments,
168                    &mut term_dict_writer,
169                    &mut postings_writer,
170                    &mut positions_writer,
171                )
172                .await?;
173
174            let postings_bytes = postings_writer.offset() as usize;
175            let term_dict_bytes = term_dict_writer.offset() as usize;
176            let positions_bytes = positions_writer.offset();
177
178            postings_writer.finish()?;
179            term_dict_writer.finish()?;
180            if positions_bytes > 0 {
181                positions_writer.finish()?;
182            } else {
183                drop(positions_writer);
184                let _ = dir.delete(&files.positions).await;
185            }
186            log::info!(
187                "[merge] postings done: {} terms, term_dict={}, postings={}, positions={}",
188                terms_processed,
189                format_bytes(term_dict_bytes),
190                format_bytes(postings_bytes),
191                format_bytes(positions_bytes as usize),
192            );
193            Ok::<(usize, usize, usize), crate::Error>((
194                terms_processed,
195                term_dict_bytes,
196                postings_bytes,
197            ))
198        };
199
200        let store_fut = async {
201            let mut store_writer = OffsetWriter::new(dir.streaming_writer(&files.store).await?);
202            let store_num_docs = self.merge_store(segments, &mut store_writer).await?;
203            let bytes = store_writer.offset() as usize;
204            store_writer.finish()?;
205            Ok::<(usize, u32), crate::Error>((bytes, store_num_docs))
206        };
207
208        let dense_fut = async {
209            self.merge_dense_vectors(dir, segments, &files, trained)
210                .await
211        };
212
213        let sparse_fut = async { self.merge_sparse_vectors(dir, segments, &files).await };
214
215        let fast_fut = async { self.merge_fast_fields(dir, segments, &files).await };
216
217        let (postings_result, store_result, vectors_bytes, (sparse_bytes, fast_bytes)) =
218            tokio::try_join!(postings_fut, store_fut, dense_fut, async {
219                tokio::try_join!(sparse_fut, fast_fut)
220            })?;
221        let (store_bytes, store_num_docs) = store_result;
222        stats.terms_processed = postings_result.0;
223        stats.term_dict_bytes = postings_result.1;
224        stats.postings_bytes = postings_result.2;
225        stats.store_bytes = store_bytes;
226        stats.vectors_bytes = vectors_bytes;
227        stats.sparse_bytes = sparse_bytes;
228        stats.fast_bytes = fast_bytes;
229        log::info!(
230            "[merge] all phases done in {:.1}s: {}",
231            merge_start.elapsed().as_secs_f64(),
232            stats
233        );
234
235        // === Mandatory: merge field stats + write meta ===
236        let mut merged_field_stats: FxHashMap<u32, FieldStats> = FxHashMap::default();
237        for segment in segments {
238            for (&field_id, field_stats) in &segment.meta().field_stats {
239                let entry = merged_field_stats.entry(field_id).or_default();
240                entry.total_tokens += field_stats.total_tokens;
241                entry.doc_count += field_stats.doc_count;
242            }
243        }
244
245        let total_docs: u32 = segments.iter().map(|s| s.num_docs()).sum();
246
247        // Verify store doc count matches metadata — a mismatch here means
248        // some store blocks were lost (e.g., compression thread panic) or
249        // source segment metadata disagrees with its store.
250        if store_num_docs != total_docs {
251            log::error!(
252                "[merge] STORE/META MISMATCH: store has {} docs but metadata expects {}. \
253                 Per-segment: {:?}",
254                store_num_docs,
255                total_docs,
256                segments
257                    .iter()
258                    .map(|s| (
259                        format!("{:016x}", s.meta().id),
260                        s.num_docs(),
261                        s.store().num_docs()
262                    ))
263                    .collect::<Vec<_>>()
264            );
265            return Err(crate::Error::Io(std::io::Error::new(
266                std::io::ErrorKind::InvalidData,
267                format!(
268                    "Store/meta doc count mismatch: store={}, meta={}",
269                    store_num_docs, total_docs
270                ),
271            )));
272        }
273
274        let meta = SegmentMeta {
275            id: new_segment_id.0,
276            num_docs: total_docs,
277            field_stats: merged_field_stats,
278        };
279
280        dir.write(&files.meta, &meta.serialize()?).await?;
281
282        let label = if trained.is_some() {
283            "ANN merge"
284        } else {
285            "Merge"
286        };
287        log::info!("{} complete: {} docs, {}", label, total_docs, stats);
288
289        Ok((meta, stats))
290    }
291}
292
293/// Delete segment files from directory (all deletions run concurrently).
294pub async fn delete_segment<D: Directory + DirectoryWriter>(
295    dir: &D,
296    segment_id: SegmentId,
297) -> Result<()> {
298    let files = SegmentFiles::new(segment_id.0);
299    let _ = tokio::join!(
300        dir.delete(&files.term_dict),
301        dir.delete(&files.postings),
302        dir.delete(&files.store),
303        dir.delete(&files.meta),
304        dir.delete(&files.vectors),
305        dir.delete(&files.sparse),
306        dir.delete(&files.positions),
307        dir.delete(&files.fast),
308    );
309    Ok(())
310}