extsort_lily/
iter.rs

1use std::collections::BinaryHeap;
2use std::cmp::Reverse;
3use std::io::{Read, Write};
4use std::marker::PhantomData;
5
6/// Implement this trait for data to be sorted
7pub trait Sortable<W: Write, R: Read>: Ord + Sized {
8
9  /// Errors that may occur during serialization and deserialization. Note that it needs to
10  /// implement `From<std::io::Error>`.
11  type Error;
12
13  /// serialize itself and write into the writer.
14  fn serialize(&self, w: &mut W) -> Result<(), Self::Error>;
15
16  /// read data and deserialize.
17  ///
18  /// If no more items to be read, return `None`.
19  fn deserialize(r: &mut R) -> Option<Result<Self, Self::Error>>;
20}
21
22/// The iterator type for producing results sorted by merge sort.
23///
24/// It produces `Result<T, T::Error>`s.
25///
26/// It can fail due to issues reading intermediate sorted chunks from disk, or due to
27/// deserialization issues.
28///
29/// The writer `W` is not actually used by this struct.
30pub struct ExtSortedIterator<T, R, W> {
31  tips: BinaryHeap<Reverse<(T, usize)>>,
32  readers: Vec<R>,
33  failed: bool,
34  phantom: PhantomData<W>,
35}
36
37impl<T, R, W> ExtSortedIterator<T, R, W>
38  where T: Sortable<W, R>,
39        W: Write, R: Read,
40{
41  /// do merge sort on `readers`.
42  pub fn new(mut readers: Vec<R>) -> Result<Self, T::Error> {
43    let mut tips = BinaryHeap::with_capacity(readers.len());
44    for (idx, r) in readers.iter_mut().enumerate() {
45      let item = T::deserialize(r).unwrap()?;
46      tips.push(Reverse((item, idx)));
47    }
48
49    Ok(Self {
50      tips,
51      readers,
52      failed: false,
53      phantom: PhantomData,
54    })
55  }
56}
57
58impl<T, R, W> Iterator for ExtSortedIterator<T, R, W>
59where
60    T: Sortable<W, R>,
61    R: Read,
62    W: Write,
63{
64    type Item = Result<T, T::Error>;
65
66    fn next(&mut self) -> Option<Self::Item> {
67        if self.failed {
68            return None;
69        }
70
71        let Reverse((r, idx)) = self.tips.pop()?;
72        match T::deserialize(&mut self.readers[idx]) {
73          Some(Ok(n)) => {
74            self.tips.push(Reverse((n, idx)));
75          },
76          Some(Err(e)) => {
77            self.failed = true;
78            return Some(Err(e));
79          },
80          None => { },
81        };
82
83        Some(Ok(r))
84    }
85}
86