hermes_core/segment/merger/
mod.rs1mod dense;
4#[cfg(feature = "diagnostics")]
5mod diagnostics;
6mod fast_fields;
7mod postings;
8mod sparse;
9mod store;
10
11use std::sync::Arc;
12
13use rustc_hash::FxHashMap;
14
15use super::reader::SegmentReader;
16use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
17use super::{OffsetWriter, format_bytes};
18use crate::Result;
19use crate::directories::{Directory, DirectoryWriter};
20use crate::dsl::Schema;
21
22fn doc_offsets(segments: &[SegmentReader]) -> Vec<u32> {
24 let mut offsets = Vec::with_capacity(segments.len());
25 let mut acc = 0u32;
26 for seg in segments {
27 offsets.push(acc);
28 acc += seg.num_docs();
29 }
30 offsets
31}
32
33#[derive(Debug, Clone, Default)]
35pub struct MergeStats {
36 pub terms_processed: usize,
38 pub term_dict_bytes: usize,
40 pub postings_bytes: usize,
42 pub store_bytes: usize,
44 pub vectors_bytes: usize,
46 pub sparse_bytes: usize,
48 pub fast_bytes: usize,
50}
51
52impl std::fmt::Display for MergeStats {
53 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54 write!(
55 f,
56 "terms={}, term_dict={}, postings={}, store={}, vectors={}, sparse={}, fast={}",
57 self.terms_processed,
58 format_bytes(self.term_dict_bytes),
59 format_bytes(self.postings_bytes),
60 format_bytes(self.store_bytes),
61 format_bytes(self.vectors_bytes),
62 format_bytes(self.sparse_bytes),
63 format_bytes(self.fast_bytes),
64 )
65 }
66}
67
68pub use super::types::TrainedVectorStructures;
70
71pub struct SegmentMerger {
73 schema: Arc<Schema>,
74}
75
76impl SegmentMerger {
77 pub fn new(schema: Arc<Schema>) -> Self {
78 Self { schema }
79 }
80
81 pub async fn merge<D: Directory + DirectoryWriter>(
91 &self,
92 dir: &D,
93 segments: &[SegmentReader],
94 new_segment_id: SegmentId,
95 trained: Option<&TrainedVectorStructures>,
96 ) -> Result<(SegmentMeta, MergeStats)> {
97 let mut stats = MergeStats::default();
98 let files = SegmentFiles::new(new_segment_id.0);
99
100 let merge_start = std::time::Instant::now();
106
107 let postings_fut = async {
108 let mut postings_writer =
109 OffsetWriter::new(dir.streaming_writer(&files.postings).await?);
110 let mut positions_writer =
111 OffsetWriter::new(dir.streaming_writer(&files.positions).await?);
112 let mut term_dict_writer =
113 OffsetWriter::new(dir.streaming_writer(&files.term_dict).await?);
114
115 let terms_processed = self
116 .merge_postings(
117 segments,
118 &mut term_dict_writer,
119 &mut postings_writer,
120 &mut positions_writer,
121 )
122 .await?;
123
124 let postings_bytes = postings_writer.offset() as usize;
125 let term_dict_bytes = term_dict_writer.offset() as usize;
126 let positions_bytes = positions_writer.offset();
127
128 postings_writer.finish()?;
129 term_dict_writer.finish()?;
130 if positions_bytes > 0 {
131 positions_writer.finish()?;
132 } else {
133 drop(positions_writer);
134 let _ = dir.delete(&files.positions).await;
135 }
136 log::info!(
137 "[merge] postings done: {} terms, term_dict={}, postings={}, positions={}",
138 terms_processed,
139 format_bytes(term_dict_bytes),
140 format_bytes(postings_bytes),
141 format_bytes(positions_bytes as usize),
142 );
143 Ok::<(usize, usize, usize), crate::Error>((
144 terms_processed,
145 term_dict_bytes,
146 postings_bytes,
147 ))
148 };
149
150 let store_fut = async {
151 let mut store_writer = OffsetWriter::new(dir.streaming_writer(&files.store).await?);
152 let store_num_docs = self.merge_store(segments, &mut store_writer).await?;
153 let bytes = store_writer.offset() as usize;
154 store_writer.finish()?;
155 Ok::<(usize, u32), crate::Error>((bytes, store_num_docs))
156 };
157
158 let dense_fut = async {
159 self.merge_dense_vectors(dir, segments, &files, trained)
160 .await
161 };
162
163 let sparse_fut = async { self.merge_sparse_vectors(dir, segments, &files).await };
164
165 let fast_fut = async { self.merge_fast_fields(dir, segments, &files).await };
166
167 let (postings_result, store_result, vectors_bytes, (sparse_bytes, fast_bytes)) =
168 tokio::try_join!(postings_fut, store_fut, dense_fut, async {
169 tokio::try_join!(sparse_fut, fast_fut)
170 })?;
171 let (store_bytes, store_num_docs) = store_result;
172 stats.terms_processed = postings_result.0;
173 stats.term_dict_bytes = postings_result.1;
174 stats.postings_bytes = postings_result.2;
175 stats.store_bytes = store_bytes;
176 stats.vectors_bytes = vectors_bytes;
177 stats.sparse_bytes = sparse_bytes;
178 stats.fast_bytes = fast_bytes;
179 log::info!(
180 "[merge] all phases done in {:.1}s: {}",
181 merge_start.elapsed().as_secs_f64(),
182 stats
183 );
184
185 let mut merged_field_stats: FxHashMap<u32, FieldStats> = FxHashMap::default();
187 for segment in segments {
188 for (&field_id, field_stats) in &segment.meta().field_stats {
189 let entry = merged_field_stats.entry(field_id).or_default();
190 entry.total_tokens += field_stats.total_tokens;
191 entry.doc_count += field_stats.doc_count;
192 }
193 }
194
195 let total_docs: u32 = segments.iter().map(|s| s.num_docs()).sum();
196
197 if store_num_docs != total_docs {
201 log::error!(
202 "[merge] STORE/META MISMATCH: store has {} docs but metadata expects {}. \
203 Per-segment: {:?}",
204 store_num_docs,
205 total_docs,
206 segments
207 .iter()
208 .map(|s| (
209 format!("{:016x}", s.meta().id),
210 s.num_docs(),
211 s.store().num_docs()
212 ))
213 .collect::<Vec<_>>()
214 );
215 return Err(crate::Error::Io(std::io::Error::new(
216 std::io::ErrorKind::InvalidData,
217 format!(
218 "Store/meta doc count mismatch: store={}, meta={}",
219 store_num_docs, total_docs
220 ),
221 )));
222 }
223
224 let meta = SegmentMeta {
225 id: new_segment_id.0,
226 num_docs: total_docs,
227 field_stats: merged_field_stats,
228 };
229
230 dir.write(&files.meta, &meta.serialize()?).await?;
231
232 let label = if trained.is_some() {
233 "ANN merge"
234 } else {
235 "Merge"
236 };
237 log::info!("{} complete: {} docs, {}", label, total_docs, stats);
238
239 Ok((meta, stats))
240 }
241}
242
243pub async fn delete_segment<D: Directory + DirectoryWriter>(
245 dir: &D,
246 segment_id: SegmentId,
247) -> Result<()> {
248 let files = SegmentFiles::new(segment_id.0);
249 let _ = tokio::join!(
250 dir.delete(&files.term_dict),
251 dir.delete(&files.postings),
252 dir.delete(&files.store),
253 dir.delete(&files.meta),
254 dir.delete(&files.vectors),
255 dir.delete(&files.sparse),
256 dir.delete(&files.positions),
257 dir.delete(&files.fast),
258 );
259 Ok(())
260}