1use std::cmp::Ordering;
47use std::marker::PhantomData;
48
49#[derive(Debug, Clone)]
51#[allow(dead_code)]
52struct LoserNode {
53 loser: usize,
55 valid: bool,
57}
58
59impl Default for LoserNode {
60 fn default() -> Self {
61 Self {
62 loser: usize::MAX,
63 valid: false,
64 }
65 }
66}
67
68pub struct TournamentTree<I, T>
86where
87 I: Iterator<Item = T>,
88 T: Ord + Clone,
89{
90 tree: Vec<LoserNode>,
93 iters: Vec<std::iter::Peekable<I>>,
95 winner: usize,
97 k: usize,
99 _phantom: PhantomData<T>,
101}
102
103impl<I, T> TournamentTree<I, T>
104where
105 I: Iterator<Item = T>,
106 T: Ord + Clone,
107{
108 pub fn new(iters: Vec<I>) -> Self {
112 let k = iters.len();
113 if k == 0 {
114 return Self {
115 tree: vec![],
116 iters: vec![],
117 winner: usize::MAX,
118 k: 0,
119 _phantom: PhantomData,
120 };
121 }
122
123 let iters: Vec<_> = iters.into_iter().map(|it| it.peekable()).collect();
125
126 let tree = vec![LoserNode::default(); k];
128
129 let mut this = Self {
130 tree,
131 iters,
132 winner: 0,
133 k,
134 _phantom: PhantomData,
135 };
136
137 this.build();
139 this
140 }
141
142 fn build(&mut self) {
144 if self.k == 0 {
145 self.winner = usize::MAX;
146 return;
147 }
148
149 let mut valid_sources: Vec<usize> = Vec::with_capacity(self.k);
155 for idx in 0..self.k {
156 if self.iters[idx].peek().is_some() {
157 valid_sources.push(idx);
158 }
159 }
160
161 if valid_sources.is_empty() {
162 self.winner = usize::MAX;
163 return;
164 }
165
166 let mut winner = valid_sources[0];
168
169 for &idx in &valid_sources[1..] {
170 if self.compare_sources(idx, winner) == std::cmp::Ordering::Less {
172 let node_idx = (self.k + winner) / 2;
174 if node_idx < self.tree.len() {
175 self.tree[node_idx] = LoserNode {
176 loser: winner,
177 valid: true,
178 };
179 }
180 winner = idx;
181 } else {
182 let node_idx = (self.k + idx) / 2;
184 if node_idx < self.tree.len() {
185 self.tree[node_idx] = LoserNode {
186 loser: idx,
187 valid: true,
188 };
189 }
190 }
191 }
192
193 self.winner = winner;
194 }
195
196 fn compare_sources(&mut self, source_a: usize, source_b: usize) -> std::cmp::Ordering {
199 let key_a = self.iters[source_a].peek().cloned();
201 let key_b = self.iters[source_b].peek().cloned();
203
204 match (key_a, key_b) {
205 (None, None) => std::cmp::Ordering::Equal,
206 (None, Some(_)) => std::cmp::Ordering::Greater, (Some(_), None) => std::cmp::Ordering::Less,
208 (Some(a), Some(b)) => a.cmp(&b),
209 }
210 }
211
212 #[allow(dead_code)]
215 fn set_loser_on_path(&mut self, loser_idx: usize, _winner_idx: usize) {
216 if self.k == 0 {
217 return;
218 }
219
220 let node_idx = (self.k + loser_idx) / 2;
223 if node_idx < self.tree.len() {
224 self.tree[node_idx] = LoserNode {
225 loser: loser_idx,
226 valid: true,
227 };
228 }
229 }
230
231 pub fn pop(&mut self) -> Option<(usize, T)> {
236 if self.winner == usize::MAX || self.k == 0 {
237 return None;
238 }
239
240 let value = self.iters[self.winner].next()?;
242 let old_winner = self.winner;
243
244 self.replay(old_winner);
246
247 Some((old_winner, value))
248 }
249
250 fn replay(&mut self, changed_idx: usize) {
254 if self.k <= 1 {
255 if self.k == 1 && self.iters[0].peek().is_none() {
257 self.winner = usize::MAX;
258 }
259 return;
260 }
261
262 let changed_exhausted = self.iters[changed_idx].peek().is_none();
264
265 if changed_exhausted {
266 self.rebuild();
268 return;
269 }
270
271 let mut winner = changed_idx;
274
275 for idx in 0..self.k {
276 if idx == winner {
277 continue;
278 }
279
280 if self.compare_sources(idx, winner) == std::cmp::Ordering::Less {
281 winner = idx;
282 }
283 }
284
285 self.winner = winner;
286 }
287
288 fn rebuild(&mut self) {
290 self.build();
291 }
292
293 pub fn peek(&mut self) -> Option<(usize, &T)> {
295 if self.winner == usize::MAX || self.k == 0 {
296 return None;
297 }
298 self.iters[self.winner].peek().map(|v| (self.winner, v))
299 }
300
301 pub fn is_empty(&self) -> bool {
303 self.winner == usize::MAX
304 }
305
306 pub fn source_count(&self) -> usize {
308 self.k
309 }
310}
311
312pub struct MergeIterator<I, T>
317where
318 I: Iterator<Item = T>,
319 T: Ord + Clone,
320{
321 tree: TournamentTree<I, T>,
322 last_key: Option<T>,
324 deduplicate: bool,
326}
327
328impl<I, T> MergeIterator<I, T>
329where
330 I: Iterator<Item = T>,
331 T: Ord + Clone,
332{
333 pub fn new(iters: Vec<I>, deduplicate: bool) -> Self {
335 Self {
336 tree: TournamentTree::new(iters),
337 last_key: None,
338 deduplicate,
339 }
340 }
341}
342
343impl<I, T> Iterator for MergeIterator<I, T>
344where
345 I: Iterator<Item = T>,
346 T: Ord + Clone,
347{
348 type Item = (usize, T);
349
350 fn next(&mut self) -> Option<Self::Item> {
351 loop {
352 let (source, value) = self.tree.pop()?;
353
354 if self.deduplicate {
355 if let Some(ref last) = self.last_key {
357 if &value == last {
358 continue;
359 }
360 }
361 self.last_key = Some(value.clone());
362 }
363
364 return Some((source, value));
365 }
366 }
367}
368
369use crate::tiered_memtable::HotEntry;
374
375#[derive(Clone)]
377pub struct KeyedEntry {
378 pub entry: HotEntry,
379}
380
381impl PartialEq for KeyedEntry {
382 fn eq(&self, other: &Self) -> bool {
383 self.entry.key.as_slice() == other.entry.key.as_slice()
384 }
385}
386
387impl Eq for KeyedEntry {}
388
389impl PartialOrd for KeyedEntry {
390 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
391 Some(self.cmp(other))
392 }
393}
394
395impl Ord for KeyedEntry {
396 fn cmp(&self, other: &Self) -> Ordering {
397 self.entry.key.as_slice().cmp(other.entry.key.as_slice())
398 }
399}
400
401pub struct HotEntryMerger {
408 tree: TournamentTree<std::vec::IntoIter<KeyedEntry>, KeyedEntry>,
409 last_key: Option<Vec<u8>>,
410}
411
412impl HotEntryMerger {
413 pub fn new(sources: Vec<Vec<HotEntry>>) -> Self {
417 let iters: Vec<_> = sources
418 .into_iter()
419 .map(|v| {
420 v.into_iter()
421 .map(|e| KeyedEntry { entry: e })
422 .collect::<Vec<_>>()
423 .into_iter()
424 })
425 .collect();
426
427 Self {
428 tree: TournamentTree::new(iters),
429 last_key: None,
430 }
431 }
432
433 pub fn next_unique(&mut self) -> Option<(usize, HotEntry)> {
435 loop {
436 let (source, keyed) = self.tree.pop()?;
437
438 if let Some(ref last) = self.last_key {
440 if keyed.entry.key.as_slice() == last.as_slice() {
441 continue;
442 }
443 }
444
445 self.last_key = Some(keyed.entry.key.to_vec());
446 return Some((source, keyed.entry));
447 }
448 }
449}
450
451impl Iterator for HotEntryMerger {
452 type Item = (usize, HotEntry);
453
454 fn next(&mut self) -> Option<Self::Item> {
455 self.next_unique()
456 }
457}
458
459#[cfg(test)]
464mod tests {
465 use super::*;
466
467 #[test]
468 fn test_tournament_tree_basic() {
469 let sources: Vec<Vec<i32>> = vec![vec![1, 4, 7, 10], vec![2, 5, 8, 11], vec![3, 6, 9, 12]];
470
471 let iters = sources.into_iter().map(|v| v.into_iter());
472 let mut tree = TournamentTree::new(iters.collect());
473
474 let mut result = Vec::new();
475 while let Some((_, val)) = tree.pop() {
476 result.push(val);
477 }
478
479 assert_eq!(result, vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]);
480 }
481
482 #[test]
483 fn test_tournament_tree_uneven() {
484 let sources: Vec<Vec<i32>> = vec![vec![1, 10], vec![2, 3, 4, 5], vec![6]];
485
486 let iters = sources.into_iter().map(|v| v.into_iter());
487 let mut tree = TournamentTree::new(iters.collect());
488
489 let mut result = Vec::new();
490 while let Some((_, val)) = tree.pop() {
491 result.push(val);
492 }
493
494 assert_eq!(result, vec![1, 2, 3, 4, 5, 6, 10]);
495 }
496
497 #[test]
498 fn test_tournament_tree_single() {
499 let sources: Vec<Vec<i32>> = vec![vec![1, 2, 3]];
500
501 let iters = sources.into_iter().map(|v| v.into_iter());
502 let mut tree = TournamentTree::new(iters.collect());
503
504 let mut result = Vec::new();
505 while let Some((_, val)) = tree.pop() {
506 result.push(val);
507 }
508
509 assert_eq!(result, vec![1, 2, 3]);
510 }
511
512 #[test]
513 fn test_tournament_tree_empty() {
514 let sources: Vec<Vec<i32>> = vec![];
515 let iters = sources.into_iter().map(|v| v.into_iter());
516 let mut tree = TournamentTree::new(iters.collect());
517
518 assert!(tree.pop().is_none());
519 assert!(tree.is_empty());
520 }
521
522 #[test]
523 fn test_tournament_tree_with_duplicates() {
524 let sources: Vec<Vec<i32>> = vec![
525 vec![1, 3, 5],
526 vec![1, 2, 4], vec![2, 3, 6], ];
529
530 let iters = sources.into_iter().map(|v| v.into_iter());
531 let tree = TournamentTree::new(iters.collect());
532
533 let merge_iter = MergeIterator {
535 tree,
536 last_key: None,
537 deduplicate: false,
538 };
539 let result: Vec<_> = merge_iter.map(|(_, v)| v).collect();
540 assert_eq!(result, vec![1, 1, 2, 2, 3, 3, 4, 5, 6]);
541 }
542
543 #[test]
544 fn test_merge_iterator_deduplicate() {
545 let sources: Vec<Vec<i32>> = vec![vec![1, 3, 5], vec![1, 2, 4], vec![2, 3, 6]];
546
547 let iters: Vec<_> = sources.into_iter().map(|v| v.into_iter()).collect();
548 let merge_iter = MergeIterator::new(iters, true);
549
550 let result: Vec<_> = merge_iter.map(|(_, v)| v).collect();
551 assert_eq!(result, vec![1, 2, 3, 4, 5, 6]);
552 }
553
554 #[test]
555 fn test_source_tracking() {
556 let sources: Vec<Vec<i32>> = vec![vec![2, 5], vec![1, 4], vec![3, 6]];
557
558 let iters: Vec<_> = sources.into_iter().map(|v| v.into_iter()).collect();
559 let mut tree = TournamentTree::new(iters);
560
561 let mut results = Vec::new();
562 while let Some((source, val)) = tree.pop() {
563 results.push((source, val));
564 }
565
566 assert_eq!(results[0], (1, 1)); assert_eq!(results[1], (0, 2)); assert_eq!(results[2], (2, 3)); }
571
572 #[test]
573 fn test_peek() {
574 let sources: Vec<Vec<i32>> = vec![vec![2, 4], vec![1, 3]];
575
576 let iters: Vec<_> = sources.into_iter().map(|v| v.into_iter()).collect();
577 let mut tree = TournamentTree::new(iters);
578
579 assert_eq!(tree.peek(), Some((1, &1)));
581 assert_eq!(tree.peek(), Some((1, &1)));
582
583 assert_eq!(tree.pop(), Some((1, 1)));
585 assert_eq!(tree.peek(), Some((0, &2)));
586 }
587}