hermes_core/segment/merger/
mod.rs1mod dense;
4#[cfg(feature = "diagnostics")]
5mod diagnostics;
6mod fast_fields;
7mod postings;
8mod sparse;
9mod store;
10
11use std::sync::Arc;
12
13use rustc_hash::FxHashMap;
14
15use super::reader::SegmentReader;
16use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
17use super::{OffsetWriter, format_bytes};
18use crate::Result;
19use crate::directories::{Directory, DirectoryWriter};
20use crate::dsl::Schema;
21
22fn doc_offsets(segments: &[SegmentReader]) -> Result<Vec<u32>> {
26 let mut offsets = Vec::with_capacity(segments.len());
27 let mut acc = 0u32;
28 for seg in segments {
29 offsets.push(acc);
30 acc = acc.checked_add(seg.num_docs()).ok_or_else(|| {
31 crate::Error::Internal(format!(
32 "Total document count across segments exceeds u32::MAX ({})",
33 u32::MAX
34 ))
35 })?;
36 }
37 Ok(offsets)
38}
39
40#[derive(Debug, Clone, Default)]
42pub struct MergeStats {
43 pub terms_processed: usize,
45 pub term_dict_bytes: usize,
47 pub postings_bytes: usize,
49 pub store_bytes: usize,
51 pub vectors_bytes: usize,
53 pub sparse_bytes: usize,
55 pub fast_bytes: usize,
57}
58
59impl std::fmt::Display for MergeStats {
60 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61 write!(
62 f,
63 "terms={}, term_dict={}, postings={}, store={}, vectors={}, sparse={}, fast={}",
64 self.terms_processed,
65 format_bytes(self.term_dict_bytes),
66 format_bytes(self.postings_bytes),
67 format_bytes(self.store_bytes),
68 format_bytes(self.vectors_bytes),
69 format_bytes(self.sparse_bytes),
70 format_bytes(self.fast_bytes),
71 )
72 }
73}
74
75pub use super::types::TrainedVectorStructures;
77
78pub struct SegmentMerger {
80 schema: Arc<Schema>,
81}
82
83impl SegmentMerger {
84 pub fn new(schema: Arc<Schema>) -> Self {
85 Self { schema }
86 }
87
88 pub async fn merge<D: Directory + DirectoryWriter>(
98 &self,
99 dir: &D,
100 segments: &[SegmentReader],
101 new_segment_id: SegmentId,
102 trained: Option<&TrainedVectorStructures>,
103 ) -> Result<(SegmentMeta, MergeStats)> {
104 let mut stats = MergeStats::default();
105 let files = SegmentFiles::new(new_segment_id.0);
106
107 let merge_start = std::time::Instant::now();
113
114 let postings_fut = async {
115 let mut postings_writer =
116 OffsetWriter::new(dir.streaming_writer(&files.postings).await?);
117 let mut positions_writer =
118 OffsetWriter::new(dir.streaming_writer(&files.positions).await?);
119 let mut term_dict_writer =
120 OffsetWriter::new(dir.streaming_writer(&files.term_dict).await?);
121
122 let terms_processed = self
123 .merge_postings(
124 segments,
125 &mut term_dict_writer,
126 &mut postings_writer,
127 &mut positions_writer,
128 )
129 .await?;
130
131 let postings_bytes = postings_writer.offset() as usize;
132 let term_dict_bytes = term_dict_writer.offset() as usize;
133 let positions_bytes = positions_writer.offset();
134
135 postings_writer.finish()?;
136 term_dict_writer.finish()?;
137 if positions_bytes > 0 {
138 positions_writer.finish()?;
139 } else {
140 drop(positions_writer);
141 let _ = dir.delete(&files.positions).await;
142 }
143 log::info!(
144 "[merge] postings done: {} terms, term_dict={}, postings={}, positions={}",
145 terms_processed,
146 format_bytes(term_dict_bytes),
147 format_bytes(postings_bytes),
148 format_bytes(positions_bytes as usize),
149 );
150 Ok::<(usize, usize, usize), crate::Error>((
151 terms_processed,
152 term_dict_bytes,
153 postings_bytes,
154 ))
155 };
156
157 let store_fut = async {
158 let mut store_writer = OffsetWriter::new(dir.streaming_writer(&files.store).await?);
159 let store_num_docs = self.merge_store(segments, &mut store_writer).await?;
160 let bytes = store_writer.offset() as usize;
161 store_writer.finish()?;
162 Ok::<(usize, u32), crate::Error>((bytes, store_num_docs))
163 };
164
165 let dense_fut = async {
166 self.merge_dense_vectors(dir, segments, &files, trained)
167 .await
168 };
169
170 let sparse_fut = async { self.merge_sparse_vectors(dir, segments, &files).await };
171
172 let fast_fut = async { self.merge_fast_fields(dir, segments, &files).await };
173
174 let (postings_result, store_result, vectors_bytes, (sparse_bytes, fast_bytes)) =
175 tokio::try_join!(postings_fut, store_fut, dense_fut, async {
176 tokio::try_join!(sparse_fut, fast_fut)
177 })?;
178 let (store_bytes, store_num_docs) = store_result;
179 stats.terms_processed = postings_result.0;
180 stats.term_dict_bytes = postings_result.1;
181 stats.postings_bytes = postings_result.2;
182 stats.store_bytes = store_bytes;
183 stats.vectors_bytes = vectors_bytes;
184 stats.sparse_bytes = sparse_bytes;
185 stats.fast_bytes = fast_bytes;
186 log::info!(
187 "[merge] all phases done in {:.1}s: {}",
188 merge_start.elapsed().as_secs_f64(),
189 stats
190 );
191
192 let mut merged_field_stats: FxHashMap<u32, FieldStats> = FxHashMap::default();
194 for segment in segments {
195 for (&field_id, field_stats) in &segment.meta().field_stats {
196 let entry = merged_field_stats.entry(field_id).or_default();
197 entry.total_tokens += field_stats.total_tokens;
198 entry.doc_count += field_stats.doc_count;
199 }
200 }
201
202 let total_docs: u32 = segments
203 .iter()
204 .try_fold(0u32, |acc, s| acc.checked_add(s.num_docs()))
205 .ok_or_else(|| {
206 crate::Error::Internal(format!(
207 "Total document count exceeds u32::MAX ({})",
208 u32::MAX
209 ))
210 })?;
211
212 if store_num_docs != total_docs {
216 log::error!(
217 "[merge] STORE/META MISMATCH: store has {} docs but metadata expects {}. \
218 Per-segment: {:?}",
219 store_num_docs,
220 total_docs,
221 segments
222 .iter()
223 .map(|s| (
224 format!("{:016x}", s.meta().id),
225 s.num_docs(),
226 s.store().num_docs()
227 ))
228 .collect::<Vec<_>>()
229 );
230 return Err(crate::Error::Io(std::io::Error::new(
231 std::io::ErrorKind::InvalidData,
232 format!(
233 "Store/meta doc count mismatch: store={}, meta={}",
234 store_num_docs, total_docs
235 ),
236 )));
237 }
238
239 let meta = SegmentMeta {
240 id: new_segment_id.0,
241 num_docs: total_docs,
242 field_stats: merged_field_stats,
243 };
244
245 dir.write(&files.meta, &meta.serialize()?).await?;
246
247 let label = if trained.is_some() {
248 "ANN merge"
249 } else {
250 "Merge"
251 };
252 log::info!("{} complete: {} docs, {}", label, total_docs, stats);
253
254 Ok((meta, stats))
255 }
256}
257
258pub async fn delete_segment<D: Directory + DirectoryWriter>(
260 dir: &D,
261 segment_id: SegmentId,
262) -> Result<()> {
263 let files = SegmentFiles::new(segment_id.0);
264 let _ = tokio::join!(
265 dir.delete(&files.term_dict),
266 dir.delete(&files.postings),
267 dir.delete(&files.store),
268 dir.delete(&files.meta),
269 dir.delete(&files.vectors),
270 dir.delete(&files.sparse),
271 dir.delete(&files.positions),
272 dir.delete(&files.fast),
273 );
274 Ok(())
275}