extsort_lily/
lib.rs

1//! An efficient external sort implementation.
2//!
3//! You start by implementing the [`Sortable`] for your data, and provide your data via an
4//! iterable. Then you create an [`ExtSorter`] to sort data.
5//!
6//! An example is provided in the `examples/` directory.
7
8use std::fs::{File, OpenOptions};
9use std::io::{BufReader, BufWriter, self};
10use std::marker::PhantomData;
11use std::path::Path;
12
13use tempfile::{Builder, TempDir};
14
15mod iter;
16
17pub use iter::{ExtSortedIterator, Sortable};
18
19/// Sort the data
20pub struct ExtSorter<T> {
21  buffer_n_items: usize,
22  tmp_dir: TempDir,
23  phantom: PhantomData<T>,
24}
25
26impl<T> ExtSorter<T>
27where
28    T: Sortable<BufWriter<File>, BufReader<File>>,
29    T::Error: From<io::Error>,
30{
31  /// Create an `ExtSorter` to sort your data.
32  ///
33  /// It will buffer `buffer_n_items` items in memory and sort them, and then write them serialized
34  /// into temporary files.
35  pub fn new(buffer_n_items: usize) -> io::Result<Self> {
36    Ok(ExtSorter {
37      buffer_n_items,
38      tmp_dir: tempdir(None)?,
39      phantom: PhantomData,
40    })
41  }
42
43  /// Same as [`new`](fn@Self::new) but provide a directory to store temporary files instead of system default.
44  pub fn new_in<P: AsRef<Path>>(
45    buffer_n_items: usize, tmp_dir: P,
46  ) -> io::Result<Self> {
47    Ok(ExtSorter {
48      buffer_n_items,
49      tmp_dir: tempdir(Some(tmp_dir.as_ref()))?,
50      phantom: PhantomData,
51    })
52  }
53
54  /// Sort the data.
55  ///
56  /// It returns an iterator to produce sorted results. The sort is unstable.
57  pub fn sort<I>(
58    &self, unsorted: I,
59  ) -> Result<iter::ExtSortedIterator<T, BufReader<File>, BufWriter<File>>, T::Error>
60  where
61      I: Iterator<Item = T>,
62  {
63    let mut chunk_count = 0;
64
65    {
66      let mut current_count = 0;
67      let mut chunk = Vec::new();
68
69      // make the initial chunks on disk
70      for seq in unsorted {
71        current_count += 1;
72        chunk.push(seq);
73        if current_count >= self.buffer_n_items {
74          chunk.sort_unstable();
75          self.write_chunk(
76            &self.tmp_dir.path().join(chunk_count.to_string()),
77            &mut chunk,
78          )?;
79          chunk.clear();
80          current_count = 0;
81          chunk_count += 1;
82        }
83      }
84      // write the last chunk
85      if !chunk.is_empty() {
86        chunk.sort_unstable();
87        self.write_chunk(
88          &self.tmp_dir.path().join(chunk_count.to_string()),
89          &mut chunk,
90        )?;
91        chunk_count += 1;
92      }
93    }
94
95    let readers = (0..chunk_count).map(|i| 
96      File::open(self.tmp_dir.path().join(i.to_string())).map(BufReader::new)
97    ).collect::<Result<Vec<_>, _>>()?;
98    iter::ExtSortedIterator::new(readers)
99  }
100
101  fn write_chunk(&self, file: &Path, chunk: &mut Vec<T>) -> Result<(), T::Error> {
102    let new_file = OpenOptions::new().create(true).write(true).truncate(true).open(file)?;
103    let mut w = BufWriter::new(new_file);
104    for s in chunk {
105      s.serialize(&mut w)?;
106    }
107
108    Ok(())
109  }
110}
111
112fn tempdir(path: Option<&Path>) -> io::Result<TempDir> {
113  let mut builder = Builder::new();
114  builder.prefix("extsort_lily");
115
116  if let Some(p) = path {
117    builder.tempdir_in(p)
118  } else {
119    builder.tempdir()
120  }
121}