1use std::cmp::Ordering;
47use std::marker::PhantomData;
48
49#[derive(Debug, Clone)]
51#[allow(dead_code)]
52struct LoserNode {
53 loser: usize,
55 valid: bool,
57}
58
59impl Default for LoserNode {
60 fn default() -> Self {
61 Self {
62 loser: usize::MAX,
63 valid: false,
64 }
65 }
66}
67
68pub struct TournamentTree<I, T>
86where
87 I: Iterator<Item = T>,
88 T: Ord + Clone,
89{
90 tree: Vec<LoserNode>,
93 iters: Vec<std::iter::Peekable<I>>,
95 winner: usize,
97 k: usize,
99 _phantom: PhantomData<T>,
101}
102
103impl<I, T> TournamentTree<I, T>
104where
105 I: Iterator<Item = T>,
106 T: Ord + Clone,
107{
108 pub fn new(iters: Vec<I>) -> Self {
112 let k = iters.len();
113 if k == 0 {
114 return Self {
115 tree: vec![],
116 iters: vec![],
117 winner: usize::MAX,
118 k: 0,
119 _phantom: PhantomData,
120 };
121 }
122
123 let iters: Vec<_> = iters.into_iter().map(|it| it.peekable()).collect();
125
126 let tree = vec![LoserNode::default(); k];
128
129 let mut this = Self {
130 tree,
131 iters,
132 winner: 0,
133 k,
134 _phantom: PhantomData,
135 };
136
137 this.build();
139 this
140 }
141
142 fn build(&mut self) {
144 if self.k == 0 {
145 self.winner = usize::MAX;
146 return;
147 }
148
149 let mut valid_sources: Vec<usize> = Vec::with_capacity(self.k);
155 for idx in 0..self.k {
156 if self.iters[idx].peek().is_some() {
157 valid_sources.push(idx);
158 }
159 }
160
161 if valid_sources.is_empty() {
162 self.winner = usize::MAX;
163 return;
164 }
165
166 let mut winner = valid_sources[0];
168
169 for &idx in &valid_sources[1..] {
170 if self.compare_sources(idx, winner) == std::cmp::Ordering::Less {
172 let node_idx = (self.k + winner) / 2;
174 if node_idx < self.tree.len() {
175 self.tree[node_idx] = LoserNode {
176 loser: winner,
177 valid: true,
178 };
179 }
180 winner = idx;
181 } else {
182 let node_idx = (self.k + idx) / 2;
184 if node_idx < self.tree.len() {
185 self.tree[node_idx] = LoserNode {
186 loser: idx,
187 valid: true,
188 };
189 }
190 }
191 }
192
193 self.winner = winner;
194 }
195
196 fn compare_sources(&mut self, source_a: usize, source_b: usize) -> std::cmp::Ordering {
199 let key_a = self.iters[source_a].peek().cloned();
201 let key_b = self.iters[source_b].peek().cloned();
203
204 match (key_a, key_b) {
205 (None, None) => std::cmp::Ordering::Equal,
206 (None, Some(_)) => std::cmp::Ordering::Greater, (Some(_), None) => std::cmp::Ordering::Less,
208 (Some(a), Some(b)) => a.cmp(&b),
209 }
210 }
211
212 #[allow(dead_code)]
215 fn set_loser_on_path(&mut self, loser_idx: usize, _winner_idx: usize) {
216 if self.k == 0 {
217 return;
218 }
219
220 let node_idx = (self.k + loser_idx) / 2;
223 if node_idx < self.tree.len() {
224 self.tree[node_idx] = LoserNode {
225 loser: loser_idx,
226 valid: true,
227 };
228 }
229 }
230
231 pub fn pop(&mut self) -> Option<(usize, T)> {
236 if self.winner == usize::MAX || self.k == 0 {
237 return None;
238 }
239
240 let value = self.iters[self.winner].next()?;
242 let old_winner = self.winner;
243
244 self.replay(old_winner);
246
247 Some((old_winner, value))
248 }
249
250 fn replay(&mut self, changed_idx: usize) {
254 if self.k <= 1 {
255 if self.k == 1 && self.iters[0].peek().is_none() {
257 self.winner = usize::MAX;
258 }
259 return;
260 }
261
262 let changed_exhausted = self.iters[changed_idx].peek().is_none();
264
265 if changed_exhausted {
266 self.rebuild();
268 return;
269 }
270
271 let mut winner = changed_idx;
274
275 for idx in 0..self.k {
276 if idx == winner {
277 continue;
278 }
279
280 if self.compare_sources(idx, winner) == std::cmp::Ordering::Less {
281 winner = idx;
282 }
283 }
284
285 self.winner = winner;
286 }
287
288 fn rebuild(&mut self) {
290 self.build();
291 }
292
293 pub fn peek(&mut self) -> Option<(usize, &T)> {
295 if self.winner == usize::MAX || self.k == 0 {
296 return None;
297 }
298 self.iters[self.winner].peek().map(|v| (self.winner, v))
299 }
300
301 pub fn is_empty(&self) -> bool {
303 self.winner == usize::MAX
304 }
305
306 pub fn source_count(&self) -> usize {
308 self.k
309 }
310}
311
312pub struct MergeIterator<I, T>
317where
318 I: Iterator<Item = T>,
319 T: Ord + Clone,
320{
321 tree: TournamentTree<I, T>,
322 last_key: Option<T>,
324 deduplicate: bool,
326}
327
328impl<I, T> MergeIterator<I, T>
329where
330 I: Iterator<Item = T>,
331 T: Ord + Clone,
332{
333 pub fn new(iters: Vec<I>, deduplicate: bool) -> Self {
335 Self {
336 tree: TournamentTree::new(iters),
337 last_key: None,
338 deduplicate,
339 }
340 }
341}
342
343impl<I, T> Iterator for MergeIterator<I, T>
344where
345 I: Iterator<Item = T>,
346 T: Ord + Clone,
347{
348 type Item = (usize, T);
349
350 fn next(&mut self) -> Option<Self::Item> {
351 loop {
352 let (source, value) = self.tree.pop()?;
353
354 if self.deduplicate {
355 if let Some(ref last) = self.last_key {
357 if &value == last {
358 continue;
359 }
360 }
361 self.last_key = Some(value.clone());
362 }
363
364 return Some((source, value));
365 }
366 }
367}
368
369use crate::tiered_memtable::HotEntry;
374
375#[derive(Clone)]
377pub struct KeyedEntry {
378 pub entry: HotEntry,
379}
380
381impl PartialEq for KeyedEntry {
382 fn eq(&self, other: &Self) -> bool {
383 self.entry.key.as_slice() == other.entry.key.as_slice()
384 }
385}
386
387impl Eq for KeyedEntry {}
388
389impl PartialOrd for KeyedEntry {
390 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
391 Some(self.cmp(other))
392 }
393}
394
395impl Ord for KeyedEntry {
396 fn cmp(&self, other: &Self) -> Ordering {
397 self.entry.key.as_slice().cmp(other.entry.key.as_slice())
398 }
399}
400
401pub struct HotEntryMerger {
408 tree: TournamentTree<std::vec::IntoIter<KeyedEntry>, KeyedEntry>,
409 last_key: Option<Vec<u8>>,
410}
411
412impl HotEntryMerger {
413 pub fn new(sources: Vec<Vec<HotEntry>>) -> Self {
417 let iters: Vec<_> = sources
418 .into_iter()
419 .map(|v| v.into_iter().map(|e| KeyedEntry { entry: e }).collect::<Vec<_>>().into_iter())
420 .collect();
421
422 Self {
423 tree: TournamentTree::new(iters),
424 last_key: None,
425 }
426 }
427
428 pub fn next_unique(&mut self) -> Option<(usize, HotEntry)> {
430 loop {
431 let (source, keyed) = self.tree.pop()?;
432
433 if let Some(ref last) = self.last_key {
435 if keyed.entry.key.as_slice() == last.as_slice() {
436 continue;
437 }
438 }
439
440 self.last_key = Some(keyed.entry.key.to_vec());
441 return Some((source, keyed.entry));
442 }
443 }
444}
445
446impl Iterator for HotEntryMerger {
447 type Item = (usize, HotEntry);
448
449 fn next(&mut self) -> Option<Self::Item> {
450 self.next_unique()
451 }
452}
453
454#[cfg(test)]
459mod tests {
460 use super::*;
461
462 #[test]
463 fn test_tournament_tree_basic() {
464 let sources: Vec<Vec<i32>> = vec![
465 vec![1, 4, 7, 10],
466 vec![2, 5, 8, 11],
467 vec![3, 6, 9, 12],
468 ];
469
470 let iters = sources.into_iter().map(|v| v.into_iter());
471 let mut tree = TournamentTree::new(iters.collect());
472
473 let mut result = Vec::new();
474 while let Some((_, val)) = tree.pop() {
475 result.push(val);
476 }
477
478 assert_eq!(result, vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]);
479 }
480
481 #[test]
482 fn test_tournament_tree_uneven() {
483 let sources: Vec<Vec<i32>> = vec![
484 vec![1, 10],
485 vec![2, 3, 4, 5],
486 vec![6],
487 ];
488
489 let iters = sources.into_iter().map(|v| v.into_iter());
490 let mut tree = TournamentTree::new(iters.collect());
491
492 let mut result = Vec::new();
493 while let Some((_, val)) = tree.pop() {
494 result.push(val);
495 }
496
497 assert_eq!(result, vec![1, 2, 3, 4, 5, 6, 10]);
498 }
499
500 #[test]
501 fn test_tournament_tree_single() {
502 let sources: Vec<Vec<i32>> = vec![
503 vec![1, 2, 3],
504 ];
505
506 let iters = sources.into_iter().map(|v| v.into_iter());
507 let mut tree = TournamentTree::new(iters.collect());
508
509 let mut result = Vec::new();
510 while let Some((_, val)) = tree.pop() {
511 result.push(val);
512 }
513
514 assert_eq!(result, vec![1, 2, 3]);
515 }
516
517 #[test]
518 fn test_tournament_tree_empty() {
519 let sources: Vec<Vec<i32>> = vec![];
520 let iters = sources.into_iter().map(|v| v.into_iter());
521 let mut tree = TournamentTree::new(iters.collect());
522
523 assert!(tree.pop().is_none());
524 assert!(tree.is_empty());
525 }
526
527 #[test]
528 fn test_tournament_tree_with_duplicates() {
529 let sources: Vec<Vec<i32>> = vec![
530 vec![1, 3, 5],
531 vec![1, 2, 4], vec![2, 3, 6], ];
534
535 let iters = sources.into_iter().map(|v| v.into_iter());
536 let tree = TournamentTree::new(iters.collect());
537
538 let merge_iter = MergeIterator {
540 tree,
541 last_key: None,
542 deduplicate: false,
543 };
544 let result: Vec<_> = merge_iter.map(|(_, v)| v).collect();
545 assert_eq!(result, vec![1, 1, 2, 2, 3, 3, 4, 5, 6]);
546 }
547
548 #[test]
549 fn test_merge_iterator_deduplicate() {
550 let sources: Vec<Vec<i32>> = vec![
551 vec![1, 3, 5],
552 vec![1, 2, 4],
553 vec![2, 3, 6],
554 ];
555
556 let iters: Vec<_> = sources.into_iter().map(|v| v.into_iter()).collect();
557 let merge_iter = MergeIterator::new(iters, true);
558
559 let result: Vec<_> = merge_iter.map(|(_, v)| v).collect();
560 assert_eq!(result, vec![1, 2, 3, 4, 5, 6]);
561 }
562
563 #[test]
564 fn test_source_tracking() {
565 let sources: Vec<Vec<i32>> = vec![
566 vec![2, 5],
567 vec![1, 4],
568 vec![3, 6],
569 ];
570
571 let iters: Vec<_> = sources.into_iter().map(|v| v.into_iter()).collect();
572 let mut tree = TournamentTree::new(iters);
573
574 let mut results = Vec::new();
575 while let Some((source, val)) = tree.pop() {
576 results.push((source, val));
577 }
578
579 assert_eq!(results[0], (1, 1)); assert_eq!(results[1], (0, 2)); assert_eq!(results[2], (2, 3)); }
584
585 #[test]
586 fn test_peek() {
587 let sources: Vec<Vec<i32>> = vec![
588 vec![2, 4],
589 vec![1, 3],
590 ];
591
592 let iters: Vec<_> = sources.into_iter().map(|v| v.into_iter()).collect();
593 let mut tree = TournamentTree::new(iters);
594
595 assert_eq!(tree.peek(), Some((1, &1)));
597 assert_eq!(tree.peek(), Some((1, &1)));
598
599 assert_eq!(tree.pop(), Some((1, 1)));
601 assert_eq!(tree.peek(), Some((0, &2)));
602 }
603}