hermes_core/segment/merger/
mod.rs1mod dense;
4#[cfg(feature = "diagnostics")]
5mod diagnostics;
6mod fast_fields;
7mod postings;
8mod sparse;
9mod store;
10
11use std::sync::Arc;
12
13use rustc_hash::FxHashMap;
14
15use super::reader::SegmentReader;
16use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
17use super::{OffsetWriter, format_bytes};
18use crate::Result;
19use crate::directories::{Directory, DirectoryWriter};
20use crate::dsl::Schema;
21
22fn doc_offsets(segments: &[SegmentReader]) -> Result<Vec<u32>> {
26 let mut offsets = Vec::with_capacity(segments.len());
27 let mut acc = 0u32;
28 for seg in segments {
29 offsets.push(acc);
30 acc = acc.checked_add(seg.num_docs()).ok_or_else(|| {
31 crate::Error::Internal(format!(
32 "Total document count across segments exceeds u32::MAX ({})",
33 u32::MAX
34 ))
35 })?;
36 }
37 Ok(offsets)
38}
39
40#[derive(Debug, Clone, Default)]
42pub struct MergeStats {
43 pub terms_processed: usize,
45 pub term_dict_bytes: usize,
47 pub postings_bytes: usize,
49 pub store_bytes: usize,
51 pub vectors_bytes: usize,
53 pub sparse_bytes: usize,
55 pub fast_bytes: usize,
57}
58
59impl std::fmt::Display for MergeStats {
60 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61 write!(
62 f,
63 "terms={}, term_dict={}, postings={}, store={}, vectors={}, sparse={}, fast={}",
64 self.terms_processed,
65 format_bytes(self.term_dict_bytes),
66 format_bytes(self.postings_bytes),
67 format_bytes(self.store_bytes),
68 format_bytes(self.vectors_bytes),
69 format_bytes(self.sparse_bytes),
70 format_bytes(self.fast_bytes),
71 )
72 }
73}
74
75pub use super::types::TrainedVectorStructures;
77
78pub struct SegmentMerger {
80 schema: Arc<Schema>,
81 force_simhash_reorder: bool,
83}
84
85impl SegmentMerger {
86 pub fn new(schema: Arc<Schema>) -> Self {
87 Self {
88 schema,
89 force_simhash_reorder: false,
90 }
91 }
92
93 pub fn with_force_simhash_reorder(mut self, force: bool) -> Self {
95 self.force_simhash_reorder = force;
96 self
97 }
98
99 pub async fn merge<D: Directory + DirectoryWriter>(
109 &self,
110 dir: &D,
111 segments: &[SegmentReader],
112 new_segment_id: SegmentId,
113 trained: Option<&TrainedVectorStructures>,
114 ) -> Result<(SegmentMeta, MergeStats)> {
115 let mut stats = MergeStats::default();
116 let files = SegmentFiles::new(new_segment_id.0);
117
118 let merge_start = std::time::Instant::now();
131
132 let postings_fut = async {
134 let mut postings_writer =
135 OffsetWriter::new(dir.streaming_writer(&files.postings).await?);
136 let mut positions_writer =
137 OffsetWriter::new(dir.streaming_writer(&files.positions).await?);
138 let mut term_dict_writer =
139 OffsetWriter::new(dir.streaming_writer(&files.term_dict).await?);
140
141 let terms_processed = self
142 .merge_postings(
143 segments,
144 &mut term_dict_writer,
145 &mut postings_writer,
146 &mut positions_writer,
147 )
148 .await?;
149
150 let postings_bytes = postings_writer.offset() as usize;
151 let term_dict_bytes = term_dict_writer.offset() as usize;
152 let positions_bytes = positions_writer.offset();
153
154 postings_writer.finish()?;
155 term_dict_writer.finish()?;
156 if positions_bytes > 0 {
157 positions_writer.finish()?;
158 } else {
159 drop(positions_writer);
160 let _ = dir.delete(&files.positions).await;
161 }
162 log::info!(
163 "[merge] postings done: {} terms, term_dict={}, postings={}, positions={}",
164 terms_processed,
165 format_bytes(term_dict_bytes),
166 format_bytes(postings_bytes),
167 format_bytes(positions_bytes as usize),
168 );
169 Ok::<(usize, usize, usize), crate::Error>((
170 terms_processed,
171 term_dict_bytes,
172 postings_bytes,
173 ))
174 };
175
176 let store_fut = async {
177 let mut store_writer = OffsetWriter::new(dir.streaming_writer(&files.store).await?);
178 let store_num_docs = self.merge_store(segments, &mut store_writer).await?;
179 let bytes = store_writer.offset() as usize;
180 store_writer.finish()?;
181 Ok::<(usize, u32), crate::Error>((bytes, store_num_docs))
182 };
183
184 let fast_fut = async { self.merge_fast_fields(dir, segments, &files).await };
185
186 let (postings_result, store_result, fast_bytes) =
187 tokio::try_join!(postings_fut, store_fut, fast_fut)?;
188
189 log::info!(
190 "[merge] stage 1 done in {:.1}s (postings + store + fast)",
191 merge_start.elapsed().as_secs_f64()
192 );
193
194 let sparse_fut = async { self.merge_sparse_vectors(dir, segments, &files).await };
198
199 let dense_fut = async {
200 self.merge_dense_vectors(dir, segments, &files, trained)
201 .await
202 };
203
204 let (sparse_bytes, vectors_bytes) = tokio::try_join!(sparse_fut, dense_fut)?;
205 let (store_bytes, store_num_docs) = store_result;
206 stats.terms_processed = postings_result.0;
207 stats.term_dict_bytes = postings_result.1;
208 stats.postings_bytes = postings_result.2;
209 stats.store_bytes = store_bytes;
210 stats.vectors_bytes = vectors_bytes;
211 stats.sparse_bytes = sparse_bytes;
212 stats.fast_bytes = fast_bytes;
213 log::info!(
214 "[merge] all phases done in {:.1}s: {}",
215 merge_start.elapsed().as_secs_f64(),
216 stats
217 );
218
219 let mut merged_field_stats: FxHashMap<u32, FieldStats> = FxHashMap::default();
221 for segment in segments {
222 for (&field_id, field_stats) in &segment.meta().field_stats {
223 let entry = merged_field_stats.entry(field_id).or_default();
224 entry.total_tokens += field_stats.total_tokens;
225 entry.doc_count += field_stats.doc_count;
226 }
227 }
228
229 let total_docs: u32 = segments
230 .iter()
231 .try_fold(0u32, |acc, s| acc.checked_add(s.num_docs()))
232 .ok_or_else(|| {
233 crate::Error::Internal(format!(
234 "Total document count exceeds u32::MAX ({})",
235 u32::MAX
236 ))
237 })?;
238
239 if store_num_docs != total_docs {
243 log::error!(
244 "[merge] STORE/META MISMATCH: store has {} docs but metadata expects {}. \
245 Per-segment: {:?}",
246 store_num_docs,
247 total_docs,
248 segments
249 .iter()
250 .map(|s| (
251 format!("{:016x}", s.meta().id),
252 s.num_docs(),
253 s.store().num_docs()
254 ))
255 .collect::<Vec<_>>()
256 );
257 return Err(crate::Error::Io(std::io::Error::new(
258 std::io::ErrorKind::InvalidData,
259 format!(
260 "Store/meta doc count mismatch: store={}, meta={}",
261 store_num_docs, total_docs
262 ),
263 )));
264 }
265
266 let meta = SegmentMeta {
267 id: new_segment_id.0,
268 num_docs: total_docs,
269 field_stats: merged_field_stats,
270 };
271
272 dir.write(&files.meta, &meta.serialize()?).await?;
273
274 let label = if trained.is_some() {
275 "ANN merge"
276 } else {
277 "Merge"
278 };
279 log::info!("{} complete: {} docs, {}", label, total_docs, stats);
280
281 Ok((meta, stats))
282 }
283}
284
285pub async fn delete_segment<D: Directory + DirectoryWriter>(
287 dir: &D,
288 segment_id: SegmentId,
289) -> Result<()> {
290 let files = SegmentFiles::new(segment_id.0);
291 let _ = tokio::join!(
292 dir.delete(&files.term_dict),
293 dir.delete(&files.postings),
294 dir.delete(&files.store),
295 dir.delete(&files.meta),
296 dir.delete(&files.vectors),
297 dir.delete(&files.sparse),
298 dir.delete(&files.positions),
299 dir.delete(&files.fast),
300 );
301 Ok(())
302}