1use std::cmp::Ordering;
44use std::marker::PhantomData;
45
46#[derive(Debug, Clone)]
48#[allow(dead_code)]
49struct LoserNode {
50 loser: usize,
52 valid: bool,
54}
55
56impl Default for LoserNode {
57 fn default() -> Self {
58 Self {
59 loser: usize::MAX,
60 valid: false,
61 }
62 }
63}
64
65pub struct TournamentTree<I, T>
83where
84 I: Iterator<Item = T>,
85 T: Ord + Clone,
86{
87 tree: Vec<LoserNode>,
90 iters: Vec<std::iter::Peekable<I>>,
92 winner: usize,
94 k: usize,
96 _phantom: PhantomData<T>,
98}
99
100impl<I, T> TournamentTree<I, T>
101where
102 I: Iterator<Item = T>,
103 T: Ord + Clone,
104{
105 pub fn new(iters: Vec<I>) -> Self {
109 let k = iters.len();
110 if k == 0 {
111 return Self {
112 tree: vec![],
113 iters: vec![],
114 winner: usize::MAX,
115 k: 0,
116 _phantom: PhantomData,
117 };
118 }
119
120 let iters: Vec<_> = iters.into_iter().map(|it| it.peekable()).collect();
122
123 let tree = vec![LoserNode::default(); k];
125
126 let mut this = Self {
127 tree,
128 iters,
129 winner: 0,
130 k,
131 _phantom: PhantomData,
132 };
133
134 this.build();
136 this
137 }
138
139 fn build(&mut self) {
141 if self.k == 0 {
142 self.winner = usize::MAX;
143 return;
144 }
145
146 let mut valid_sources: Vec<usize> = Vec::with_capacity(self.k);
152 for idx in 0..self.k {
153 if self.iters[idx].peek().is_some() {
154 valid_sources.push(idx);
155 }
156 }
157
158 if valid_sources.is_empty() {
159 self.winner = usize::MAX;
160 return;
161 }
162
163 let mut winner = valid_sources[0];
165
166 for &idx in &valid_sources[1..] {
167 if self.compare_sources(idx, winner) == std::cmp::Ordering::Less {
169 let node_idx = (self.k + winner) / 2;
171 if node_idx < self.tree.len() {
172 self.tree[node_idx] = LoserNode {
173 loser: winner,
174 valid: true,
175 };
176 }
177 winner = idx;
178 } else {
179 let node_idx = (self.k + idx) / 2;
181 if node_idx < self.tree.len() {
182 self.tree[node_idx] = LoserNode {
183 loser: idx,
184 valid: true,
185 };
186 }
187 }
188 }
189
190 self.winner = winner;
191 }
192
193 fn compare_sources(&mut self, source_a: usize, source_b: usize) -> std::cmp::Ordering {
196 let key_a = self.iters[source_a].peek().cloned();
198 let key_b = self.iters[source_b].peek().cloned();
200
201 match (key_a, key_b) {
202 (None, None) => std::cmp::Ordering::Equal,
203 (None, Some(_)) => std::cmp::Ordering::Greater, (Some(_), None) => std::cmp::Ordering::Less,
205 (Some(a), Some(b)) => a.cmp(&b),
206 }
207 }
208
209 #[allow(dead_code)]
212 fn set_loser_on_path(&mut self, loser_idx: usize, _winner_idx: usize) {
213 if self.k == 0 {
214 return;
215 }
216
217 let node_idx = (self.k + loser_idx) / 2;
220 if node_idx < self.tree.len() {
221 self.tree[node_idx] = LoserNode {
222 loser: loser_idx,
223 valid: true,
224 };
225 }
226 }
227
228 pub fn pop(&mut self) -> Option<(usize, T)> {
233 if self.winner == usize::MAX || self.k == 0 {
234 return None;
235 }
236
237 let value = self.iters[self.winner].next()?;
239 let old_winner = self.winner;
240
241 self.replay(old_winner);
243
244 Some((old_winner, value))
245 }
246
247 fn replay(&mut self, changed_idx: usize) {
251 if self.k <= 1 {
252 if self.k == 1 && self.iters[0].peek().is_none() {
254 self.winner = usize::MAX;
255 }
256 return;
257 }
258
259 let changed_exhausted = self.iters[changed_idx].peek().is_none();
261
262 if changed_exhausted {
263 self.rebuild();
265 return;
266 }
267
268 let mut winner = changed_idx;
271
272 for idx in 0..self.k {
273 if idx == winner {
274 continue;
275 }
276
277 if self.compare_sources(idx, winner) == std::cmp::Ordering::Less {
278 winner = idx;
279 }
280 }
281
282 self.winner = winner;
283 }
284
285 fn rebuild(&mut self) {
287 self.build();
288 }
289
290 pub fn peek(&mut self) -> Option<(usize, &T)> {
292 if self.winner == usize::MAX || self.k == 0 {
293 return None;
294 }
295 self.iters[self.winner].peek().map(|v| (self.winner, v))
296 }
297
298 pub fn is_empty(&self) -> bool {
300 self.winner == usize::MAX
301 }
302
303 pub fn source_count(&self) -> usize {
305 self.k
306 }
307}
308
309pub struct MergeIterator<I, T>
314where
315 I: Iterator<Item = T>,
316 T: Ord + Clone,
317{
318 tree: TournamentTree<I, T>,
319 last_key: Option<T>,
321 deduplicate: bool,
323}
324
325impl<I, T> MergeIterator<I, T>
326where
327 I: Iterator<Item = T>,
328 T: Ord + Clone,
329{
330 pub fn new(iters: Vec<I>, deduplicate: bool) -> Self {
332 Self {
333 tree: TournamentTree::new(iters),
334 last_key: None,
335 deduplicate,
336 }
337 }
338}
339
340impl<I, T> Iterator for MergeIterator<I, T>
341where
342 I: Iterator<Item = T>,
343 T: Ord + Clone,
344{
345 type Item = (usize, T);
346
347 fn next(&mut self) -> Option<Self::Item> {
348 loop {
349 let (source, value) = self.tree.pop()?;
350
351 if self.deduplicate {
352 if let Some(ref last) = self.last_key {
354 if &value == last {
355 continue;
356 }
357 }
358 self.last_key = Some(value.clone());
359 }
360
361 return Some((source, value));
362 }
363 }
364}
365
366use crate::tiered_memtable::HotEntry;
371
372#[derive(Clone)]
374pub struct KeyedEntry {
375 pub entry: HotEntry,
376}
377
378impl PartialEq for KeyedEntry {
379 fn eq(&self, other: &Self) -> bool {
380 self.entry.key.as_slice() == other.entry.key.as_slice()
381 }
382}
383
384impl Eq for KeyedEntry {}
385
386impl PartialOrd for KeyedEntry {
387 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
388 Some(self.cmp(other))
389 }
390}
391
392impl Ord for KeyedEntry {
393 fn cmp(&self, other: &Self) -> Ordering {
394 self.entry.key.as_slice().cmp(other.entry.key.as_slice())
395 }
396}
397
398pub struct HotEntryMerger {
405 tree: TournamentTree<std::vec::IntoIter<KeyedEntry>, KeyedEntry>,
406 last_key: Option<Vec<u8>>,
407}
408
409impl HotEntryMerger {
410 pub fn new(sources: Vec<Vec<HotEntry>>) -> Self {
414 let iters: Vec<_> = sources
415 .into_iter()
416 .map(|v| v.into_iter().map(|e| KeyedEntry { entry: e }).collect::<Vec<_>>().into_iter())
417 .collect();
418
419 Self {
420 tree: TournamentTree::new(iters),
421 last_key: None,
422 }
423 }
424
425 pub fn next_unique(&mut self) -> Option<(usize, HotEntry)> {
427 loop {
428 let (source, keyed) = self.tree.pop()?;
429
430 if let Some(ref last) = self.last_key {
432 if keyed.entry.key.as_slice() == last.as_slice() {
433 continue;
434 }
435 }
436
437 self.last_key = Some(keyed.entry.key.to_vec());
438 return Some((source, keyed.entry));
439 }
440 }
441}
442
443impl Iterator for HotEntryMerger {
444 type Item = (usize, HotEntry);
445
446 fn next(&mut self) -> Option<Self::Item> {
447 self.next_unique()
448 }
449}
450
451#[cfg(test)]
456mod tests {
457 use super::*;
458
459 #[test]
460 fn test_tournament_tree_basic() {
461 let sources: Vec<Vec<i32>> = vec![
462 vec![1, 4, 7, 10],
463 vec![2, 5, 8, 11],
464 vec![3, 6, 9, 12],
465 ];
466
467 let iters = sources.into_iter().map(|v| v.into_iter());
468 let mut tree = TournamentTree::new(iters.collect());
469
470 let mut result = Vec::new();
471 while let Some((_, val)) = tree.pop() {
472 result.push(val);
473 }
474
475 assert_eq!(result, vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]);
476 }
477
478 #[test]
479 fn test_tournament_tree_uneven() {
480 let sources: Vec<Vec<i32>> = vec![
481 vec![1, 10],
482 vec![2, 3, 4, 5],
483 vec![6],
484 ];
485
486 let iters = sources.into_iter().map(|v| v.into_iter());
487 let mut tree = TournamentTree::new(iters.collect());
488
489 let mut result = Vec::new();
490 while let Some((_, val)) = tree.pop() {
491 result.push(val);
492 }
493
494 assert_eq!(result, vec![1, 2, 3, 4, 5, 6, 10]);
495 }
496
497 #[test]
498 fn test_tournament_tree_single() {
499 let sources: Vec<Vec<i32>> = vec![
500 vec![1, 2, 3],
501 ];
502
503 let iters = sources.into_iter().map(|v| v.into_iter());
504 let mut tree = TournamentTree::new(iters.collect());
505
506 let mut result = Vec::new();
507 while let Some((_, val)) = tree.pop() {
508 result.push(val);
509 }
510
511 assert_eq!(result, vec![1, 2, 3]);
512 }
513
514 #[test]
515 fn test_tournament_tree_empty() {
516 let sources: Vec<Vec<i32>> = vec![];
517 let iters = sources.into_iter().map(|v| v.into_iter());
518 let mut tree = TournamentTree::new(iters.collect());
519
520 assert!(tree.pop().is_none());
521 assert!(tree.is_empty());
522 }
523
524 #[test]
525 fn test_tournament_tree_with_duplicates() {
526 let sources: Vec<Vec<i32>> = vec![
527 vec![1, 3, 5],
528 vec![1, 2, 4], vec![2, 3, 6], ];
531
532 let iters = sources.into_iter().map(|v| v.into_iter());
533 let tree = TournamentTree::new(iters.collect());
534
535 let merge_iter = MergeIterator {
537 tree,
538 last_key: None,
539 deduplicate: false,
540 };
541 let result: Vec<_> = merge_iter.map(|(_, v)| v).collect();
542 assert_eq!(result, vec![1, 1, 2, 2, 3, 3, 4, 5, 6]);
543 }
544
545 #[test]
546 fn test_merge_iterator_deduplicate() {
547 let sources: Vec<Vec<i32>> = vec![
548 vec![1, 3, 5],
549 vec![1, 2, 4],
550 vec![2, 3, 6],
551 ];
552
553 let iters: Vec<_> = sources.into_iter().map(|v| v.into_iter()).collect();
554 let merge_iter = MergeIterator::new(iters, true);
555
556 let result: Vec<_> = merge_iter.map(|(_, v)| v).collect();
557 assert_eq!(result, vec![1, 2, 3, 4, 5, 6]);
558 }
559
560 #[test]
561 fn test_source_tracking() {
562 let sources: Vec<Vec<i32>> = vec![
563 vec![2, 5],
564 vec![1, 4],
565 vec![3, 6],
566 ];
567
568 let iters: Vec<_> = sources.into_iter().map(|v| v.into_iter()).collect();
569 let mut tree = TournamentTree::new(iters);
570
571 let mut results = Vec::new();
572 while let Some((source, val)) = tree.pop() {
573 results.push((source, val));
574 }
575
576 assert_eq!(results[0], (1, 1)); assert_eq!(results[1], (0, 2)); assert_eq!(results[2], (2, 3)); }
581
582 #[test]
583 fn test_peek() {
584 let sources: Vec<Vec<i32>> = vec![
585 vec![2, 4],
586 vec![1, 3],
587 ];
588
589 let iters: Vec<_> = sources.into_iter().map(|v| v.into_iter()).collect();
590 let mut tree = TournamentTree::new(iters);
591
592 assert_eq!(tree.peek(), Some((1, &1)));
594 assert_eq!(tree.peek(), Some((1, &1)));
595
596 assert_eq!(tree.pop(), Some((1, 1)));
598 assert_eq!(tree.peek(), Some((0, &2)));
599 }
600}