hermes_core/segment/merger/
mod.rs1mod dense;
4#[cfg(feature = "diagnostics")]
5mod diagnostics;
6mod fast_fields;
7mod postings;
8mod sparse;
9mod store;
10
11use std::io::Write;
12use std::sync::Arc;
13
14use rustc_hash::FxHashMap;
15
16use super::reader::SegmentReader;
17use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
18use crate::Result;
19use crate::directories::{Directory, DirectoryWriter, StreamingWriter};
20use crate::dsl::Schema;
21
22pub(crate) struct OffsetWriter {
27 inner: Box<dyn StreamingWriter>,
28 offset: u64,
29}
30
31impl OffsetWriter {
32 fn new(inner: Box<dyn StreamingWriter>) -> Self {
33 Self { inner, offset: 0 }
34 }
35
36 fn offset(&self) -> u64 {
38 self.offset
39 }
40
41 fn finish(self) -> std::io::Result<()> {
43 self.inner.finish()
44 }
45}
46
47impl Write for OffsetWriter {
48 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
49 let n = self.inner.write(buf)?;
50 self.offset += n as u64;
51 Ok(n)
52 }
53
54 fn flush(&mut self) -> std::io::Result<()> {
55 self.inner.flush()
56 }
57}
58
59fn format_bytes(bytes: usize) -> String {
61 if bytes >= 1024 * 1024 * 1024 {
62 format!("{:.2} GB", bytes as f64 / (1024.0 * 1024.0 * 1024.0))
63 } else if bytes >= 1024 * 1024 {
64 format!("{:.2} MB", bytes as f64 / (1024.0 * 1024.0))
65 } else if bytes >= 1024 {
66 format!("{:.2} KB", bytes as f64 / 1024.0)
67 } else {
68 format!("{} B", bytes)
69 }
70}
71
72fn doc_offsets(segments: &[SegmentReader]) -> Vec<u32> {
74 let mut offsets = Vec::with_capacity(segments.len());
75 let mut acc = 0u32;
76 for seg in segments {
77 offsets.push(acc);
78 acc += seg.num_docs();
79 }
80 offsets
81}
82
83#[derive(Debug, Clone, Default)]
85pub struct MergeStats {
86 pub terms_processed: usize,
88 pub term_dict_bytes: usize,
90 pub postings_bytes: usize,
92 pub store_bytes: usize,
94 pub vectors_bytes: usize,
96 pub sparse_bytes: usize,
98 pub fast_bytes: usize,
100}
101
102impl std::fmt::Display for MergeStats {
103 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104 write!(
105 f,
106 "terms={}, term_dict={}, postings={}, store={}, vectors={}, sparse={}, fast={}",
107 self.terms_processed,
108 format_bytes(self.term_dict_bytes),
109 format_bytes(self.postings_bytes),
110 format_bytes(self.store_bytes),
111 format_bytes(self.vectors_bytes),
112 format_bytes(self.sparse_bytes),
113 format_bytes(self.fast_bytes),
114 )
115 }
116}
117
118pub use super::types::TrainedVectorStructures;
120
121pub struct SegmentMerger {
123 schema: Arc<Schema>,
124}
125
126impl SegmentMerger {
127 pub fn new(schema: Arc<Schema>) -> Self {
128 Self { schema }
129 }
130
131 pub async fn merge<D: Directory + DirectoryWriter>(
141 &self,
142 dir: &D,
143 segments: &[SegmentReader],
144 new_segment_id: SegmentId,
145 trained: Option<&TrainedVectorStructures>,
146 ) -> Result<(SegmentMeta, MergeStats)> {
147 let mut stats = MergeStats::default();
148 let files = SegmentFiles::new(new_segment_id.0);
149
150 let merge_start = std::time::Instant::now();
156
157 let postings_fut = async {
158 let mut postings_writer =
159 OffsetWriter::new(dir.streaming_writer(&files.postings).await?);
160 let mut positions_writer =
161 OffsetWriter::new(dir.streaming_writer(&files.positions).await?);
162 let mut term_dict_writer =
163 OffsetWriter::new(dir.streaming_writer(&files.term_dict).await?);
164
165 let terms_processed = self
166 .merge_postings(
167 segments,
168 &mut term_dict_writer,
169 &mut postings_writer,
170 &mut positions_writer,
171 )
172 .await?;
173
174 let postings_bytes = postings_writer.offset() as usize;
175 let term_dict_bytes = term_dict_writer.offset() as usize;
176 let positions_bytes = positions_writer.offset();
177
178 postings_writer.finish()?;
179 term_dict_writer.finish()?;
180 if positions_bytes > 0 {
181 positions_writer.finish()?;
182 } else {
183 drop(positions_writer);
184 let _ = dir.delete(&files.positions).await;
185 }
186 log::info!(
187 "[merge] postings done: {} terms, term_dict={}, postings={}, positions={}",
188 terms_processed,
189 format_bytes(term_dict_bytes),
190 format_bytes(postings_bytes),
191 format_bytes(positions_bytes as usize),
192 );
193 Ok::<(usize, usize, usize), crate::Error>((
194 terms_processed,
195 term_dict_bytes,
196 postings_bytes,
197 ))
198 };
199
200 let store_fut = async {
201 let mut store_writer = OffsetWriter::new(dir.streaming_writer(&files.store).await?);
202 let store_num_docs = self.merge_store(segments, &mut store_writer).await?;
203 let bytes = store_writer.offset() as usize;
204 store_writer.finish()?;
205 Ok::<(usize, u32), crate::Error>((bytes, store_num_docs))
206 };
207
208 let dense_fut = async {
209 self.merge_dense_vectors(dir, segments, &files, trained)
210 .await
211 };
212
213 let sparse_fut = async { self.merge_sparse_vectors(dir, segments, &files).await };
214
215 let fast_fut = async { self.merge_fast_fields(dir, segments, &files).await };
216
217 let (postings_result, store_result, vectors_bytes, (sparse_bytes, fast_bytes)) =
218 tokio::try_join!(postings_fut, store_fut, dense_fut, async {
219 tokio::try_join!(sparse_fut, fast_fut)
220 })?;
221 let (store_bytes, store_num_docs) = store_result;
222 stats.terms_processed = postings_result.0;
223 stats.term_dict_bytes = postings_result.1;
224 stats.postings_bytes = postings_result.2;
225 stats.store_bytes = store_bytes;
226 stats.vectors_bytes = vectors_bytes;
227 stats.sparse_bytes = sparse_bytes;
228 stats.fast_bytes = fast_bytes;
229 log::info!(
230 "[merge] all phases done in {:.1}s: {}",
231 merge_start.elapsed().as_secs_f64(),
232 stats
233 );
234
235 let mut merged_field_stats: FxHashMap<u32, FieldStats> = FxHashMap::default();
237 for segment in segments {
238 for (&field_id, field_stats) in &segment.meta().field_stats {
239 let entry = merged_field_stats.entry(field_id).or_default();
240 entry.total_tokens += field_stats.total_tokens;
241 entry.doc_count += field_stats.doc_count;
242 }
243 }
244
245 let total_docs: u32 = segments.iter().map(|s| s.num_docs()).sum();
246
247 if store_num_docs != total_docs {
251 log::error!(
252 "[merge] STORE/META MISMATCH: store has {} docs but metadata expects {}. \
253 Per-segment: {:?}",
254 store_num_docs,
255 total_docs,
256 segments
257 .iter()
258 .map(|s| (
259 format!("{:016x}", s.meta().id),
260 s.num_docs(),
261 s.store().num_docs()
262 ))
263 .collect::<Vec<_>>()
264 );
265 return Err(crate::Error::Io(std::io::Error::new(
266 std::io::ErrorKind::InvalidData,
267 format!(
268 "Store/meta doc count mismatch: store={}, meta={}",
269 store_num_docs, total_docs
270 ),
271 )));
272 }
273
274 let meta = SegmentMeta {
275 id: new_segment_id.0,
276 num_docs: total_docs,
277 field_stats: merged_field_stats,
278 };
279
280 dir.write(&files.meta, &meta.serialize()?).await?;
281
282 let label = if trained.is_some() {
283 "ANN merge"
284 } else {
285 "Merge"
286 };
287 log::info!("{} complete: {} docs, {}", label, total_docs, stats);
288
289 Ok((meta, stats))
290 }
291}
292
293pub async fn delete_segment<D: Directory + DirectoryWriter>(
295 dir: &D,
296 segment_id: SegmentId,
297) -> Result<()> {
298 let files = SegmentFiles::new(segment_id.0);
299 let _ = tokio::join!(
300 dir.delete(&files.term_dict),
301 dir.delete(&files.postings),
302 dir.delete(&files.store),
303 dir.delete(&files.meta),
304 dir.delete(&files.vectors),
305 dir.delete(&files.sparse),
306 dir.delete(&files.positions),
307 dir.delete(&files.fast),
308 );
309 Ok(())
310}