1mod dense_vectors;
4mod sparse_vectors;
5
6use std::cmp::Ordering;
7use std::collections::BinaryHeap;
8use std::io::Write;
9use std::sync::Arc;
10
11use rustc_hash::FxHashMap;
12
13use super::reader::SegmentReader;
14use super::store::StoreMerger;
15use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
16use crate::Result;
17use crate::directories::{Directory, DirectoryWriter, StreamingWriter};
18use crate::dsl::Schema;
19use crate::structures::{
20 BlockPostingList, PositionPostingList, PostingList, SSTableWriter, TERMINATED, TermInfo,
21};
22
23pub(crate) struct OffsetWriter {
28 inner: Box<dyn StreamingWriter>,
29 offset: u64,
30}
31
32impl OffsetWriter {
33 fn new(inner: Box<dyn StreamingWriter>) -> Self {
34 Self { inner, offset: 0 }
35 }
36
37 fn offset(&self) -> u64 {
39 self.offset
40 }
41
42 fn finish(self) -> std::io::Result<()> {
44 self.inner.finish()
45 }
46}
47
48impl Write for OffsetWriter {
49 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
50 let n = self.inner.write(buf)?;
51 self.offset += n as u64;
52 Ok(n)
53 }
54
55 fn flush(&mut self) -> std::io::Result<()> {
56 self.inner.flush()
57 }
58}
59
60fn format_bytes(bytes: usize) -> String {
62 if bytes >= 1024 * 1024 * 1024 {
63 format!("{:.2} GB", bytes as f64 / (1024.0 * 1024.0 * 1024.0))
64 } else if bytes >= 1024 * 1024 {
65 format!("{:.2} MB", bytes as f64 / (1024.0 * 1024.0))
66 } else if bytes >= 1024 {
67 format!("{:.2} KB", bytes as f64 / 1024.0)
68 } else {
69 format!("{} B", bytes)
70 }
71}
72
73fn doc_offsets(segments: &[SegmentReader]) -> Vec<u32> {
75 let mut offsets = Vec::with_capacity(segments.len());
76 let mut acc = 0u32;
77 for seg in segments {
78 offsets.push(acc);
79 acc += seg.num_docs();
80 }
81 offsets
82}
83
84#[derive(Debug, Clone, Default)]
86pub struct MergeStats {
87 pub terms_processed: usize,
89 pub peak_memory_bytes: usize,
91 pub current_memory_bytes: usize,
93 pub term_dict_bytes: usize,
95 pub postings_bytes: usize,
97 pub store_bytes: usize,
99 pub vectors_bytes: usize,
101 pub sparse_bytes: usize,
103}
104
105struct MergeEntry {
107 key: Vec<u8>,
108 term_info: TermInfo,
109 segment_idx: usize,
110 doc_offset: u32,
111}
112
113impl PartialEq for MergeEntry {
114 fn eq(&self, other: &Self) -> bool {
115 self.key == other.key
116 }
117}
118
119impl Eq for MergeEntry {}
120
121impl PartialOrd for MergeEntry {
122 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
123 Some(self.cmp(other))
124 }
125}
126
127impl Ord for MergeEntry {
128 fn cmp(&self, other: &Self) -> Ordering {
129 other.key.cmp(&self.key)
131 }
132}
133
134pub struct TrainedVectorStructures {
136 pub centroids: rustc_hash::FxHashMap<u32, Arc<crate::structures::CoarseCentroids>>,
138 pub codebooks: rustc_hash::FxHashMap<u32, Arc<crate::structures::PQCodebook>>,
140}
141
142pub struct SegmentMerger {
144 schema: Arc<Schema>,
145}
146
147impl SegmentMerger {
148 pub fn new(schema: Arc<Schema>) -> Self {
149 Self { schema }
150 }
151
152 pub async fn merge<D: Directory + DirectoryWriter>(
154 &self,
155 dir: &D,
156 segments: &[SegmentReader],
157 new_segment_id: SegmentId,
158 ) -> Result<(SegmentMeta, MergeStats)> {
159 self.merge_core(dir, segments, new_segment_id, None).await
160 }
161
162 pub async fn merge_with_ann<D: Directory + DirectoryWriter>(
168 &self,
169 dir: &D,
170 segments: &[SegmentReader],
171 new_segment_id: SegmentId,
172 trained: &TrainedVectorStructures,
173 ) -> Result<(SegmentMeta, MergeStats)> {
174 self.merge_core(dir, segments, new_segment_id, Some(trained))
175 .await
176 }
177
178 async fn merge_core<D: Directory + DirectoryWriter>(
185 &self,
186 dir: &D,
187 segments: &[SegmentReader],
188 new_segment_id: SegmentId,
189 trained: Option<&TrainedVectorStructures>,
190 ) -> Result<(SegmentMeta, MergeStats)> {
191 let mut stats = MergeStats::default();
192 let files = SegmentFiles::new(new_segment_id.0);
193
194 let mut postings_writer = OffsetWriter::new(dir.streaming_writer(&files.postings).await?);
196 let mut positions_writer = OffsetWriter::new(dir.streaming_writer(&files.positions).await?);
197 let mut term_dict_writer = OffsetWriter::new(dir.streaming_writer(&files.term_dict).await?);
198
199 let terms_processed = self
200 .merge_postings(
201 segments,
202 &mut term_dict_writer,
203 &mut postings_writer,
204 &mut positions_writer,
205 &mut stats,
206 )
207 .await?;
208 stats.terms_processed = terms_processed;
209 stats.postings_bytes = postings_writer.offset() as usize;
210 stats.term_dict_bytes = term_dict_writer.offset() as usize;
211 let positions_bytes = positions_writer.offset();
212
213 postings_writer.finish()?;
214 term_dict_writer.finish()?;
215 if positions_bytes > 0 {
216 positions_writer.finish()?;
217 } else {
218 drop(positions_writer);
219 let _ = dir.delete(&files.positions).await;
220 }
221
222 {
224 let mut store_writer = OffsetWriter::new(dir.streaming_writer(&files.store).await?);
225 {
226 let mut store_merger = StoreMerger::new(&mut store_writer);
227 for segment in segments {
228 if segment.store_has_dict() {
229 store_merger
230 .append_store_recompressing(segment.store())
231 .await
232 .map_err(crate::Error::Io)?;
233 } else {
234 let raw_blocks = segment.store_raw_blocks();
235 let data_slice = segment.store_data_slice();
236 store_merger.append_store(data_slice, &raw_blocks).await?;
237 }
238 }
239 store_merger.finish()?;
240 }
241 stats.store_bytes = store_writer.offset() as usize;
242 store_writer.finish()?;
243 }
244
245 let vectors_bytes = self
247 .merge_dense_vectors(dir, segments, &files, trained)
248 .await?;
249 stats.vectors_bytes = vectors_bytes;
250
251 let sparse_bytes = self.merge_sparse_vectors(dir, segments, &files).await?;
253 stats.sparse_bytes = sparse_bytes;
254
255 let mut merged_field_stats: FxHashMap<u32, FieldStats> = FxHashMap::default();
257 for segment in segments {
258 for (&field_id, field_stats) in &segment.meta().field_stats {
259 let entry = merged_field_stats.entry(field_id).or_default();
260 entry.total_tokens += field_stats.total_tokens;
261 entry.doc_count += field_stats.doc_count;
262 }
263 }
264
265 let total_docs: u32 = segments.iter().map(|s| s.num_docs()).sum();
266 let meta = SegmentMeta {
267 id: new_segment_id.0,
268 num_docs: total_docs,
269 field_stats: merged_field_stats,
270 };
271
272 dir.write(&files.meta, &meta.serialize()?).await?;
273
274 let label = if trained.is_some() {
275 "ANN merge"
276 } else {
277 "Merge"
278 };
279 log::info!(
280 "{} complete: {} docs, {} terms, term_dict={}, postings={}, store={}, vectors={}, sparse={}",
281 label,
282 total_docs,
283 stats.terms_processed,
284 format_bytes(stats.term_dict_bytes),
285 format_bytes(stats.postings_bytes),
286 format_bytes(stats.store_bytes),
287 format_bytes(stats.vectors_bytes),
288 format_bytes(stats.sparse_bytes),
289 );
290
291 Ok((meta, stats))
292 }
293
294 async fn merge_postings(
306 &self,
307 segments: &[SegmentReader],
308 term_dict: &mut OffsetWriter,
309 postings_out: &mut OffsetWriter,
310 positions_out: &mut OffsetWriter,
311 stats: &mut MergeStats,
312 ) -> Result<usize> {
313 let doc_offs = doc_offsets(segments);
314
315 for (i, segment) in segments.iter().enumerate() {
317 log::debug!("Prefetching term dict for segment {} ...", i);
318 segment.prefetch_term_dict().await?;
319 }
320
321 let mut iterators: Vec<_> = segments.iter().map(|s| s.term_dict_iter()).collect();
323
324 let mut heap: BinaryHeap<MergeEntry> = BinaryHeap::new();
326 for (seg_idx, iter) in iterators.iter_mut().enumerate() {
327 if let Some((key, term_info)) = iter.next().await.map_err(crate::Error::from)? {
328 heap.push(MergeEntry {
329 key,
330 term_info,
331 segment_idx: seg_idx,
332 doc_offset: doc_offs[seg_idx],
333 });
334 }
335 }
336
337 let mut term_results: Vec<(Vec<u8>, TermInfo)> = Vec::new();
340 let mut terms_processed = 0usize;
341
342 while !heap.is_empty() {
343 let first = heap.pop().unwrap();
345 let current_key = first.key.clone();
346
347 let mut sources: Vec<(usize, TermInfo, u32)> =
349 vec![(first.segment_idx, first.term_info, first.doc_offset)];
350
351 if let Some((key, term_info)) = iterators[first.segment_idx]
353 .next()
354 .await
355 .map_err(crate::Error::from)?
356 {
357 heap.push(MergeEntry {
358 key,
359 term_info,
360 segment_idx: first.segment_idx,
361 doc_offset: doc_offs[first.segment_idx],
362 });
363 }
364
365 while let Some(entry) = heap.peek() {
367 if entry.key != current_key {
368 break;
369 }
370 let entry = heap.pop().unwrap();
371 sources.push((entry.segment_idx, entry.term_info, entry.doc_offset));
372
373 if let Some((key, term_info)) = iterators[entry.segment_idx]
375 .next()
376 .await
377 .map_err(crate::Error::from)?
378 {
379 heap.push(MergeEntry {
380 key,
381 term_info,
382 segment_idx: entry.segment_idx,
383 doc_offset: doc_offs[entry.segment_idx],
384 });
385 }
386 }
387
388 let term_info = self
390 .merge_term(segments, &sources, postings_out, positions_out)
391 .await?;
392
393 term_results.push((current_key, term_info));
394 terms_processed += 1;
395
396 if terms_processed.is_multiple_of(100_000) {
398 log::debug!("Merge progress: {} terms processed", terms_processed);
399 }
400 }
401
402 let results_mem = term_results.capacity() * std::mem::size_of::<(Vec<u8>, TermInfo)>();
404 stats.current_memory_bytes = results_mem;
405 stats.peak_memory_bytes = stats.peak_memory_bytes.max(stats.current_memory_bytes);
406
407 log::info!(
408 "[merge] complete: terms={}, segments={}, term_buffer={:.2} MB, postings={}, positions={}",
409 terms_processed,
410 segments.len(),
411 results_mem as f64 / (1024.0 * 1024.0),
412 format_bytes(postings_out.offset() as usize),
413 format_bytes(positions_out.offset() as usize),
414 );
415
416 let mut writer = SSTableWriter::<TermInfo>::new(term_dict);
418 for (key, term_info) in term_results {
419 writer.insert(&key, &term_info)?;
420 }
421 writer.finish()?;
422
423 Ok(terms_processed)
424 }
425
426 async fn merge_term(
432 &self,
433 segments: &[SegmentReader],
434 sources: &[(usize, TermInfo, u32)],
435 postings_out: &mut OffsetWriter,
436 positions_out: &mut OffsetWriter,
437 ) -> Result<TermInfo> {
438 let mut sorted: Vec<_> = sources.to_vec();
439 sorted.sort_by_key(|(_, _, off)| *off);
440
441 let any_positions = sorted.iter().any(|(_, ti, _)| ti.position_info().is_some());
442 let all_external = sorted.iter().all(|(_, ti, _)| ti.external_info().is_some());
443
444 let (posting_offset, posting_len, doc_count) = if all_external && sorted.len() > 1 {
446 let mut block_sources = Vec::with_capacity(sorted.len());
448 for (seg_idx, ti, doc_off) in &sorted {
449 let (off, len) = ti.external_info().unwrap();
450 let bytes = segments[*seg_idx].read_postings(off, len).await?;
451 let bpl = BlockPostingList::deserialize(&mut bytes.as_slice())?;
452 block_sources.push((bpl, *doc_off));
453 }
454 let merged = BlockPostingList::concatenate_blocks(&block_sources)?;
455 let offset = postings_out.offset();
456 let mut buf = Vec::new();
457 merged.serialize(&mut buf)?;
458 postings_out.write_all(&buf)?;
459 (offset, buf.len() as u32, merged.doc_count())
460 } else {
461 let mut merged = PostingList::new();
463 for (seg_idx, ti, doc_off) in &sorted {
464 if let Some((ids, tfs)) = ti.decode_inline() {
465 for (id, tf) in ids.into_iter().zip(tfs) {
466 merged.add(id + doc_off, tf);
467 }
468 } else {
469 let (off, len) = ti.external_info().unwrap();
470 let bytes = segments[*seg_idx].read_postings(off, len).await?;
471 let bpl = BlockPostingList::deserialize(&mut bytes.as_slice())?;
472 let mut it = bpl.iterator();
473 while it.doc() != TERMINATED {
474 merged.add(it.doc() + doc_off, it.term_freq());
475 it.advance();
476 }
477 }
478 }
479 if !any_positions {
481 let ids: Vec<u32> = merged.iter().map(|p| p.doc_id).collect();
482 let tfs: Vec<u32> = merged.iter().map(|p| p.term_freq).collect();
483 if let Some(inline) = TermInfo::try_inline(&ids, &tfs) {
484 return Ok(inline);
485 }
486 }
487 let offset = postings_out.offset();
488 let block = BlockPostingList::from_posting_list(&merged)?;
489 let mut buf = Vec::new();
490 block.serialize(&mut buf)?;
491 postings_out.write_all(&buf)?;
492 (offset, buf.len() as u32, merged.doc_count())
493 };
494
495 if any_positions {
497 let mut pos_sources = Vec::new();
498 for (seg_idx, ti, doc_off) in &sorted {
499 if let Some((pos_off, pos_len)) = ti.position_info()
500 && let Some(bytes) = segments[*seg_idx]
501 .read_position_bytes(pos_off, pos_len)
502 .await?
503 {
504 let pl = PositionPostingList::deserialize(&mut bytes.as_slice())
505 .map_err(crate::Error::Io)?;
506 pos_sources.push((pl, *doc_off));
507 }
508 }
509 if !pos_sources.is_empty() {
510 let merged = PositionPostingList::concatenate_blocks(&pos_sources)
511 .map_err(crate::Error::Io)?;
512 let offset = positions_out.offset();
513 let mut buf = Vec::new();
514 merged.serialize(&mut buf).map_err(crate::Error::Io)?;
515 positions_out.write_all(&buf)?;
516 return Ok(TermInfo::external_with_positions(
517 posting_offset,
518 posting_len,
519 doc_count,
520 offset,
521 buf.len() as u32,
522 ));
523 }
524 }
525
526 Ok(TermInfo::external(posting_offset, posting_len, doc_count))
527 }
528}
529
530pub async fn delete_segment<D: Directory + DirectoryWriter>(
532 dir: &D,
533 segment_id: SegmentId,
534) -> Result<()> {
535 let files = SegmentFiles::new(segment_id.0);
536 let _ = dir.delete(&files.term_dict).await;
537 let _ = dir.delete(&files.postings).await;
538 let _ = dir.delete(&files.store).await;
539 let _ = dir.delete(&files.meta).await;
540 let _ = dir.delete(&files.vectors).await;
541 let _ = dir.delete(&files.sparse).await;
542 let _ = dir.delete(&files.positions).await;
543 Ok(())
544}