1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
//! Postings merge via streaming k-way merge.
//!
//! Uses a min-heap to merge terms from all segments in sorted order
//! without loading all terms into memory at once.
//!
//! Optimization: For terms that exist in only one segment, we copy the
//! posting data directly without decode/encode. Only terms that exist
//! in multiple segments need full merge.
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::io::Write;
use super::OffsetWriter;
use super::SegmentMerger;
use super::doc_offsets;
use crate::Result;
use crate::segment::reader::SegmentReader;
use crate::structures::{
BlockPostingList, PositionPostingList, PostingList, SSTableWriter, TERMINATED, TermInfo,
};
/// Entry for k-way merge heap
struct MergeEntry {
key: Vec<u8>,
term_info: TermInfo,
segment_idx: usize,
doc_offset: u32,
}
impl PartialEq for MergeEntry {
fn eq(&self, other: &Self) -> bool {
self.key == other.key
}
}
impl Eq for MergeEntry {}
impl PartialOrd for MergeEntry {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for MergeEntry {
fn cmp(&self, other: &Self) -> Ordering {
// Reverse order for min-heap (BinaryHeap is max-heap by default)
other.key.cmp(&self.key)
}
}
impl SegmentMerger {
/// Merge postings from multiple segments using streaming k-way merge
///
/// SSTable entries are written inline during the merge loop (no buffering).
/// This is possible because SSTableWriter<W> is Send when W is Send.
///
/// Returns the number of terms processed.
pub(super) async fn merge_postings(
&self,
segments: &[SegmentReader],
term_dict: &mut OffsetWriter,
postings_out: &mut OffsetWriter,
positions_out: &mut OffsetWriter,
) -> Result<usize> {
let doc_offs = doc_offsets(segments)?;
// Parallel prefetch all term dict blocks
let prefetch_start = std::time::Instant::now();
let mut futs = Vec::with_capacity(segments.len());
for segment in segments.iter() {
futs.push(segment.prefetch_term_dict());
}
let results = futures::future::join_all(futs).await;
for (i, res) in results.into_iter().enumerate() {
res.map_err(|e| {
log::error!("Prefetch failed for segment {}: {}", i, e);
e
})?;
}
log::debug!(
"Prefetched {} term dicts in {:.1}s",
segments.len(),
prefetch_start.elapsed().as_secs_f64()
);
// Create iterators for each segment's term dictionary
let mut iterators: Vec<_> = segments.iter().map(|s| s.term_dict_iter()).collect();
// Initialize min-heap with first entry from each segment
let mut heap: BinaryHeap<MergeEntry> = BinaryHeap::new();
for (seg_idx, iter) in iterators.iter_mut().enumerate() {
if let Some((key, term_info)) = iter.next().await.map_err(crate::Error::from)? {
heap.push(MergeEntry {
key,
term_info,
segment_idx: seg_idx,
doc_offset: doc_offs[seg_idx],
});
}
}
// Write SSTable entries inline — no buffering needed since
// SSTableWriter<&mut OffsetWriter> is Send (OffsetWriter is Send).
let mut term_dict_writer = SSTableWriter::<&mut OffsetWriter, TermInfo>::new(term_dict);
let mut terms_processed = 0usize;
let mut serialize_buf: Vec<u8> = Vec::new();
// Pre-allocate sources buffer outside loop — reused for every term
let mut sources: Vec<(usize, TermInfo, u32)> = Vec::with_capacity(segments.len());
while !heap.is_empty() {
// Get the smallest key (move, not clone)
let first = heap.pop().unwrap();
let current_key = first.key;
// Collect all entries with the same key
sources.clear();
sources.push((first.segment_idx, first.term_info, first.doc_offset));
// Advance the iterator that provided this entry
if let Some((key, term_info)) = iterators[first.segment_idx]
.next()
.await
.map_err(crate::Error::from)?
{
heap.push(MergeEntry {
key,
term_info,
segment_idx: first.segment_idx,
doc_offset: doc_offs[first.segment_idx],
});
}
// Check if other segments have the same key
while let Some(entry) = heap.peek() {
if entry.key != current_key {
break;
}
let entry = heap.pop().unwrap();
sources.push((entry.segment_idx, entry.term_info, entry.doc_offset));
// Advance this iterator too
if let Some((key, term_info)) = iterators[entry.segment_idx]
.next()
.await
.map_err(crate::Error::from)?
{
heap.push(MergeEntry {
key,
term_info,
segment_idx: entry.segment_idx,
doc_offset: doc_offs[entry.segment_idx],
});
}
}
// Process this term (handles both single-source and multi-source)
let term_info = self
.merge_term(
segments,
&mut sources,
postings_out,
positions_out,
&mut serialize_buf,
)
.await?;
// Write directly to SSTable (no buffering)
term_dict_writer
.insert(¤t_key, &term_info)
.map_err(crate::Error::Io)?;
terms_processed += 1;
// Log progress every 100k terms
if terms_processed.is_multiple_of(100_000) {
log::debug!("Merge progress: {} terms processed", terms_processed);
}
}
term_dict_writer.finish().map_err(crate::Error::Io)?;
Ok(terms_processed)
}
/// Merge a single term's postings + positions from one or more source segments.
///
/// Fast path: when all sources are external and there are multiple,
/// uses block-level concatenation (O(blocks) instead of O(postings)).
/// Otherwise: full decode → remap doc IDs → re-encode.
async fn merge_term(
&self,
segments: &[SegmentReader],
sources: &mut [(usize, TermInfo, u32)],
postings_out: &mut OffsetWriter,
positions_out: &mut OffsetWriter,
buf: &mut Vec<u8>,
) -> Result<TermInfo> {
sources.sort_by_key(|(_, _, off)| *off);
let any_positions = sources
.iter()
.any(|(_, ti, _)| ti.position_info().is_some());
let all_external = sources
.iter()
.all(|(_, ti, _)| ti.external_info().is_some());
// === Merge postings ===
let (posting_offset, posting_len, doc_count) = if all_external && sources.len() > 1 {
// Fast path: streaming merge (blocks → output writer, no buffering)
// Read all segments' postings in parallel
let read_futs: Vec<_> = sources
.iter()
.map(|(seg_idx, ti, doc_off)| {
let (off, len) = ti.external_info().unwrap();
let seg = &segments[*seg_idx];
let doc_off = *doc_off;
async move {
let bytes = seg.read_postings(off, len).await?;
Ok::<_, crate::Error>((bytes, doc_off))
}
})
.collect();
let raw_sources: Vec<(Vec<u8>, u32)> = futures::future::try_join_all(read_futs).await?;
let refs: Vec<(&[u8], u32)> = raw_sources
.iter()
.map(|(b, off)| (b.as_slice(), *off))
.collect();
let offset = postings_out.offset();
let (doc_count, bytes_written) =
BlockPostingList::concatenate_streaming(&refs, postings_out)?;
(offset, bytes_written as u64, doc_count)
} else {
// Decode all sources into a flat PostingList, remap doc IDs
let mut merged = PostingList::new();
for (seg_idx, ti, doc_off) in sources.iter() {
if let Some((ids, tfs)) = ti.decode_inline() {
for (id, tf) in ids.into_iter().zip(tfs) {
merged.add(id + doc_off, tf);
}
} else {
let (off, len) = ti.external_info().unwrap();
let bytes = segments[*seg_idx].read_postings(off, len).await?;
let bpl = BlockPostingList::deserialize(&bytes)?;
let mut it = bpl.iterator();
while it.doc() != TERMINATED {
merged.add(it.doc() + doc_off, it.term_freq());
it.advance();
}
}
}
// Try to inline (only when no positions)
if !any_positions
&& let Some(inline) = TermInfo::try_inline_iter(
merged.doc_count() as usize,
merged.iter().map(|p| (p.doc_id, p.term_freq)),
)
{
return Ok(inline);
}
let offset = postings_out.offset();
let block = BlockPostingList::from_posting_list(&merged)?;
buf.clear();
block.serialize(buf)?;
postings_out.write_all(buf)?;
(offset, buf.len() as u64, merged.doc_count())
};
// === Merge positions (if any source has them) ===
if any_positions {
// Read all position data in parallel
let pos_futs: Vec<_> = sources
.iter()
.filter_map(|(seg_idx, ti, doc_off)| {
let (pos_off, pos_len) = ti.position_info()?;
let seg = &segments[*seg_idx];
let doc_off = *doc_off;
Some(async move {
match seg.read_position_bytes(pos_off, pos_len).await? {
Some(bytes) => Ok::<_, crate::Error>(Some((bytes, doc_off))),
None => Ok(None),
}
})
})
.collect();
let raw_pos: Vec<(Vec<u8>, u32)> = futures::future::try_join_all(pos_futs)
.await?
.into_iter()
.flatten()
.collect();
if !raw_pos.is_empty() {
let refs: Vec<(&[u8], u32)> = raw_pos
.iter()
.map(|(b, off)| (b.as_slice(), *off))
.collect();
let offset = positions_out.offset();
let (_doc_count, bytes_written) =
PositionPostingList::concatenate_streaming(&refs, positions_out)
.map_err(crate::Error::Io)?;
return Ok(TermInfo::external_with_positions(
posting_offset,
posting_len,
doc_count,
offset,
bytes_written as u64,
));
}
}
Ok(TermInfo::external(posting_offset, posting_len, doc_count))
}
}