ext_sort/
chunk.rs

1//! External chunk.
2
3use std::error::Error;
4use std::fmt::{self, Display};
5use std::fs;
6use std::io;
7use std::io::prelude::*;
8use std::marker::PhantomData;
9
10use tempfile;
11
12/// External chunk error
13#[derive(Debug)]
14pub enum ExternalChunkError<S: Error> {
15    /// Common I/O error.
16    IO(io::Error),
17    /// Data serialization error.
18    SerializationError(S),
19}
20
21impl<S: Error> Error for ExternalChunkError<S> {}
22
23impl<S: Error> Display for ExternalChunkError<S> {
24    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
25        match self {
26            ExternalChunkError::IO(err) => write!(f, "{}", err),
27            ExternalChunkError::SerializationError(err) => write!(f, "{}", err),
28        }
29    }
30}
31
32impl<S: Error> From<io::Error> for ExternalChunkError<S> {
33    fn from(err: io::Error) -> Self {
34        ExternalChunkError::IO(err)
35    }
36}
37
38/// External chunk interface. Provides methods for creating a chunk stored on file system and reading data from it.
39pub trait ExternalChunk<T>: Sized + Iterator<Item = Result<T, Self::DeserializationError>> {
40    /// Error returned when data serialization failed.
41    type SerializationError: Error;
42    /// Error returned when data deserialization failed.
43    type DeserializationError: Error;
44
45    /// Builds an instance of an external chunk creating file and dumping the items to it.
46    ///
47    /// # Arguments
48    /// * `dir` - Directory the chunk file is created in
49    /// * `items` - Items to be dumped to the chunk
50    /// * `buf_size` - File I/O buffer size
51    fn build(
52        dir: &tempfile::TempDir,
53        items: impl IntoIterator<Item = T>,
54        buf_size: Option<usize>,
55    ) -> Result<Self, ExternalChunkError<Self::SerializationError>> {
56        let tmp_file = tempfile::tempfile_in(dir)?;
57
58        let mut chunk_writer = match buf_size {
59            Some(buf_size) => io::BufWriter::with_capacity(buf_size, tmp_file.try_clone()?),
60            None => io::BufWriter::new(tmp_file.try_clone()?),
61        };
62
63        Self::dump(&mut chunk_writer, items).map_err(ExternalChunkError::SerializationError)?;
64
65        chunk_writer.flush()?;
66
67        let mut chunk_reader = match buf_size {
68            Some(buf_size) => io::BufReader::with_capacity(buf_size, tmp_file.try_clone()?),
69            None => io::BufReader::new(tmp_file.try_clone()?),
70        };
71
72        chunk_reader.rewind()?;
73        let file_len = tmp_file.metadata()?.len();
74
75        return Ok(Self::new(chunk_reader.take(file_len)));
76    }
77
78    /// Creates and instance of an external chunk.
79    ///
80    /// # Arguments
81    /// * `reader` - The reader of the file the chunk is stored in
82    fn new(reader: io::Take<io::BufReader<fs::File>>) -> Self;
83
84    /// Dumps items to an external file.
85    ///
86    /// # Arguments
87    /// * `chunk_writer` - The writer of the file the data should be dumped in
88    /// * `items` - Items to be dumped
89    fn dump(
90        chunk_writer: &mut io::BufWriter<fs::File>,
91        items: impl IntoIterator<Item = T>,
92    ) -> Result<(), Self::SerializationError>;
93}
94
95/// RMP (Rust MessagePack) external chunk implementation.
96/// It uses MessagePack as a data serialization format.
97/// For more information see [msgpack.org](https://msgpack.org/).
98///
99/// # Example
100///
101/// ```no_run
102/// use tempfile::TempDir;
103/// use ext_sort::{ExternalChunk, RmpExternalChunk};
104///
105/// let dir = TempDir::new().unwrap();
106/// let chunk: RmpExternalChunk<i32> = ExternalChunk::build(&dir, (0..1000), None).unwrap();
107/// ```
108pub struct RmpExternalChunk<T> {
109    reader: io::Take<io::BufReader<fs::File>>,
110
111    item_type: PhantomData<T>,
112}
113
114impl<T> ExternalChunk<T> for RmpExternalChunk<T>
115where
116    T: serde::ser::Serialize + serde::de::DeserializeOwned,
117{
118    type SerializationError = rmp_serde::encode::Error;
119    type DeserializationError = rmp_serde::decode::Error;
120
121    fn new(reader: io::Take<io::BufReader<fs::File>>) -> Self {
122        RmpExternalChunk {
123            reader,
124            item_type: PhantomData,
125        }
126    }
127
128    fn dump(
129        mut chunk_writer: &mut io::BufWriter<fs::File>,
130        items: impl IntoIterator<Item = T>,
131    ) -> Result<(), Self::SerializationError> {
132        for item in items.into_iter() {
133            rmp_serde::encode::write(&mut chunk_writer, &item)?;
134        }
135
136        return Ok(());
137    }
138}
139
140impl<T> Iterator for RmpExternalChunk<T>
141where
142    T: serde::ser::Serialize + serde::de::DeserializeOwned,
143{
144    type Item = Result<T, <Self as ExternalChunk<T>>::DeserializationError>;
145
146    fn next(&mut self) -> Option<Self::Item> {
147        if self.reader.limit() == 0 {
148            None
149        } else {
150            match rmp_serde::decode::from_read(&mut self.reader) {
151                Ok(result) => Some(Ok(result)),
152                Err(err) => Some(Err(err)),
153            }
154        }
155    }
156}
157
158#[cfg(test)]
159mod test {
160    use rstest::*;
161
162    use super::{ExternalChunk, RmpExternalChunk};
163
164    #[fixture]
165    fn tmp_dir() -> tempfile::TempDir {
166        tempfile::tempdir_in("./").unwrap()
167    }
168
169    #[rstest]
170    fn test_rmp_chunk(tmp_dir: tempfile::TempDir) {
171        let saved = Vec::from_iter(0..100);
172
173        let chunk: RmpExternalChunk<i32> = ExternalChunk::build(&tmp_dir, saved.clone(), None).unwrap();
174
175        let restored: Result<Vec<i32>, _> = chunk.collect();
176        let restored = restored.unwrap();
177
178        assert_eq!(restored, saved);
179    }
180}