hermes_core/segment/merger/
mod.rs1mod dense;
4#[cfg(feature = "diagnostics")]
5mod diagnostics;
6mod fast_fields;
7mod postings;
8mod sparse;
9mod store;
10
11use std::sync::Arc;
12
13use rustc_hash::FxHashMap;
14
15use super::reader::SegmentReader;
16use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
17use super::{OffsetWriter, format_bytes};
18use crate::Result;
19use crate::directories::{Directory, DirectoryWriter};
20use crate::dsl::Schema;
21
22fn doc_offsets(segments: &[SegmentReader]) -> Result<Vec<u32>> {
26 let mut offsets = Vec::with_capacity(segments.len());
27 let mut acc = 0u32;
28 for seg in segments {
29 offsets.push(acc);
30 acc = acc.checked_add(seg.num_docs()).ok_or_else(|| {
31 crate::Error::Internal(format!(
32 "Total document count across segments exceeds u32::MAX ({})",
33 u32::MAX
34 ))
35 })?;
36 }
37 Ok(offsets)
38}
39
40#[derive(Debug, Clone, Default)]
42pub struct MergeStats {
43 pub terms_processed: usize,
45 pub term_dict_bytes: usize,
47 pub postings_bytes: usize,
49 pub store_bytes: usize,
51 pub vectors_bytes: usize,
53 pub sparse_bytes: usize,
55 pub fast_bytes: usize,
57}
58
59impl std::fmt::Display for MergeStats {
60 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61 write!(
62 f,
63 "terms={}, term_dict={}, postings={}, store={}, vectors={}, sparse={}, fast={}",
64 self.terms_processed,
65 format_bytes(self.term_dict_bytes),
66 format_bytes(self.postings_bytes),
67 format_bytes(self.store_bytes),
68 format_bytes(self.vectors_bytes),
69 format_bytes(self.sparse_bytes),
70 format_bytes(self.fast_bytes),
71 )
72 }
73}
74
75pub use super::types::TrainedVectorStructures;
77
78pub struct SegmentMerger {
80 schema: Arc<Schema>,
81}
82
83impl SegmentMerger {
84 pub fn new(schema: Arc<Schema>) -> Self {
85 Self { schema }
86 }
87
88 pub async fn merge<D: Directory + DirectoryWriter>(
98 &self,
99 dir: &D,
100 segments: &[SegmentReader],
101 new_segment_id: SegmentId,
102 trained: Option<&TrainedVectorStructures>,
103 ) -> Result<(SegmentMeta, MergeStats)> {
104 let mut stats = MergeStats::default();
105 let files = SegmentFiles::new(new_segment_id.0);
106
107 let merge_start = std::time::Instant::now();
120
121 let postings_fut = async {
123 let mut postings_writer =
124 OffsetWriter::new(dir.streaming_writer(&files.postings).await?);
125 let mut positions_writer =
126 OffsetWriter::new(dir.streaming_writer(&files.positions).await?);
127 let mut term_dict_writer =
128 OffsetWriter::new(dir.streaming_writer(&files.term_dict).await?);
129
130 let terms_processed = self
131 .merge_postings(
132 segments,
133 &mut term_dict_writer,
134 &mut postings_writer,
135 &mut positions_writer,
136 )
137 .await?;
138
139 let postings_bytes = postings_writer.offset() as usize;
140 let term_dict_bytes = term_dict_writer.offset() as usize;
141 let positions_bytes = positions_writer.offset();
142
143 postings_writer.finish()?;
144 term_dict_writer.finish()?;
145 if positions_bytes > 0 {
146 positions_writer.finish()?;
147 } else {
148 drop(positions_writer);
149 let _ = dir.delete(&files.positions).await;
150 }
151 log::info!(
152 "[merge] postings done: {} terms, term_dict={}, postings={}, positions={}",
153 terms_processed,
154 format_bytes(term_dict_bytes),
155 format_bytes(postings_bytes),
156 format_bytes(positions_bytes as usize),
157 );
158 Ok::<(usize, usize, usize), crate::Error>((
159 terms_processed,
160 term_dict_bytes,
161 postings_bytes,
162 ))
163 };
164
165 let store_fut = async {
166 let mut store_writer = OffsetWriter::new(dir.streaming_writer(&files.store).await?);
167 let store_num_docs = self.merge_store(segments, &mut store_writer).await?;
168 let bytes = store_writer.offset() as usize;
169 store_writer.finish()?;
170 Ok::<(usize, u32), crate::Error>((bytes, store_num_docs))
171 };
172
173 let fast_fut = async { self.merge_fast_fields(dir, segments, &files).await };
174
175 let (postings_result, store_result, fast_bytes) =
176 tokio::try_join!(postings_fut, store_fut, fast_fut)?;
177
178 log::info!(
179 "[merge] stage 1 done in {:.1}s (postings + store + fast)",
180 merge_start.elapsed().as_secs_f64()
181 );
182
183 let sparse_fut = async { self.merge_sparse_vectors(dir, segments, &files).await };
187
188 let dense_fut = async {
189 self.merge_dense_vectors(dir, segments, &files, trained)
190 .await
191 };
192
193 let (sparse_bytes, vectors_bytes) = tokio::try_join!(sparse_fut, dense_fut)?;
194 let (store_bytes, store_num_docs) = store_result;
195 stats.terms_processed = postings_result.0;
196 stats.term_dict_bytes = postings_result.1;
197 stats.postings_bytes = postings_result.2;
198 stats.store_bytes = store_bytes;
199 stats.vectors_bytes = vectors_bytes;
200 stats.sparse_bytes = sparse_bytes;
201 stats.fast_bytes = fast_bytes;
202 log::info!(
203 "[merge] all phases done in {:.1}s: {}",
204 merge_start.elapsed().as_secs_f64(),
205 stats
206 );
207
208 let mut merged_field_stats: FxHashMap<u32, FieldStats> = FxHashMap::default();
210 for segment in segments {
211 for (&field_id, field_stats) in &segment.meta().field_stats {
212 let entry = merged_field_stats.entry(field_id).or_default();
213 entry.total_tokens += field_stats.total_tokens;
214 entry.doc_count += field_stats.doc_count;
215 }
216 }
217
218 let total_docs: u32 = segments
219 .iter()
220 .try_fold(0u32, |acc, s| acc.checked_add(s.num_docs()))
221 .ok_or_else(|| {
222 crate::Error::Internal(format!(
223 "Total document count exceeds u32::MAX ({})",
224 u32::MAX
225 ))
226 })?;
227
228 if store_num_docs != total_docs {
232 log::error!(
233 "[merge] STORE/META MISMATCH: store has {} docs but metadata expects {}. \
234 Per-segment: {:?}",
235 store_num_docs,
236 total_docs,
237 segments
238 .iter()
239 .map(|s| (
240 format!("{:016x}", s.meta().id),
241 s.num_docs(),
242 s.store().num_docs()
243 ))
244 .collect::<Vec<_>>()
245 );
246 return Err(crate::Error::Io(std::io::Error::new(
247 std::io::ErrorKind::InvalidData,
248 format!(
249 "Store/meta doc count mismatch: store={}, meta={}",
250 store_num_docs, total_docs
251 ),
252 )));
253 }
254
255 let meta = SegmentMeta {
256 id: new_segment_id.0,
257 num_docs: total_docs,
258 field_stats: merged_field_stats,
259 };
260
261 dir.write(&files.meta, &meta.serialize()?).await?;
262
263 let label = if trained.is_some() {
264 "ANN merge"
265 } else {
266 "Merge"
267 };
268 log::info!("{} complete: {} docs, {}", label, total_docs, stats);
269
270 Ok((meta, stats))
271 }
272}
273
274pub async fn delete_segment<D: Directory + DirectoryWriter>(
276 dir: &D,
277 segment_id: SegmentId,
278) -> Result<()> {
279 let files = SegmentFiles::new(segment_id.0);
280 let _ = tokio::join!(
281 dir.delete(&files.term_dict),
282 dir.delete(&files.postings),
283 dir.delete(&files.store),
284 dir.delete(&files.meta),
285 dir.delete(&files.vectors),
286 dir.delete(&files.sparse),
287 dir.delete(&files.positions),
288 dir.delete(&files.fast),
289 );
290 Ok(())
291}