bed_utils/extsort/
chunk.rs

1//! External chunk.
2
3use bincode::{Decode, Encode};
4use lz4::{Decoder, Encoder, EncoderBuilder};
5use std::error::Error;
6use std::fmt::{self, Display};
7use std::fs::File;
8use std::io::{self, BufWriter};
9use std::io::{prelude::*, BufReader};
10use std::marker::PhantomData;
11
12use byteorder::{ReadBytesExt, WriteBytesExt};
13use tempfile;
14
15/// External chunk error
16#[derive(Debug)]
17pub enum ExternalChunkError {
18    /// Common I/O error.
19    IO(io::Error),
20    /// Data serialization error.
21    EncodeError(bincode::error::EncodeError),
22    DecodeError(bincode::error::DecodeError),
23}
24
25impl From<io::Error> for ExternalChunkError {
26    fn from(err: io::Error) -> Self {
27        ExternalChunkError::IO(err)
28    }
29}
30
31impl From<bincode::error::EncodeError> for ExternalChunkError {
32    fn from(err: bincode::error::EncodeError) -> Self {
33        ExternalChunkError::EncodeError(err)
34    }
35}
36
37impl From<bincode::error::DecodeError> for ExternalChunkError {
38    fn from(err: bincode::error::DecodeError) -> Self {
39        ExternalChunkError::DecodeError(err)
40    }
41}
42
43impl Error for ExternalChunkError {}
44
45impl Display for ExternalChunkError {
46    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47        match self {
48            ExternalChunkError::IO(err) => write!(f, "{}", err),
49            ExternalChunkError::EncodeError(err) => write!(f, "{}", err),
50            ExternalChunkError::DecodeError(err) => write!(f, "{}", err),
51        }
52    }
53}
54
55/// External chunk interface. Provides methods for creating a chunk stored on file system and reading data from it.
56pub struct ExternalChunk<T> {
57    reader: Box<dyn Read>,
58    item_type: PhantomData<T>,
59}
60
61impl<T> ExternalChunk<T>
62where
63    T: Encode,
64{
65    /// Builds an instance of an external chunk creating file and dumping the items to it.
66    ///
67    /// # Arguments
68    /// * `dir` - Directory the chunk file is created in
69    /// * `items` - Items to be dumped to the chunk
70    /// * `buf_size` - File I/O buffer size
71    pub(crate) fn new(
72        dir: &tempfile::TempDir,
73        items: impl IntoIterator<Item = T>,
74        compression: Option<u32>,
75    ) -> Result<Self, ExternalChunkError> {
76        let mut builder = ExternalChunkBuilder::new(dir, compression)?;
77        for item in items.into_iter() {
78            builder.add(item)?;
79        }
80        builder.finish()
81    }
82}
83
84impl<T> Iterator for ExternalChunk<T>
85where
86    T: Decode<()>,
87{
88    type Item = Result<T, ExternalChunkError>;
89
90    fn next(&mut self) -> Option<Self::Item> {
91        match self.reader.read_u64::<byteorder::LittleEndian>() {
92            Err(err) => match err.kind() {
93                std::io::ErrorKind::UnexpectedEof => None,
94                _ => Some(Err(ExternalChunkError::IO(err))),
95            },
96            Ok(length) => {
97                let config = bincode::config::standard();
98                let mut buf = vec![0u8; length as usize];
99                if let Err(err) = self.reader.read_exact(buf.as_mut()) {
100                    return Some(Err(ExternalChunkError::IO(err)));
101                } else {
102                    match bincode::decode_from_slice(&buf, config) {
103                        Err(err) => Some(Err(ExternalChunkError::from(err))),
104                        Ok((ser, n)) => {
105                            if n != length as usize {
106                                Some(Err(ExternalChunkError::IO(io::Error::new(
107                                    io::ErrorKind::InvalidData,
108                                    format!("Expected {} bytes, got {}", length, n),
109                                ))))
110                            } else {
111                                Some(Ok(ser))
112                            }
113                        }
114                    }
115                }
116            }
117        }
118    }
119}
120
121pub struct ExternalChunkBuilder<T> {
122    writer: Result<Encoder<File>, BufWriter<File>>,
123    item_type: PhantomData<T>,
124}
125
126impl<T: Encode> ExternalChunkBuilder<T> {
127    pub fn new(
128        dir: &tempfile::TempDir,
129        compression: Option<u32>,
130    ) -> Result<Self, ExternalChunkError> {
131        let tmp_file = tempfile::tempfile_in(dir)?;
132
133        let writer = if let Some(lvl) = compression {
134            Ok(EncoderBuilder::new().level(lvl).build(tmp_file)?)
135        } else {
136            Err(BufWriter::new(tmp_file))
137        };
138
139        Ok(Self {
140            writer,
141            item_type: PhantomData,
142        })
143    }
144
145    pub fn add(&mut self, item: T) -> Result<(), ExternalChunkError> {
146        let result = bincode::encode_to_vec(&item, bincode::config::standard())
147            .map_err(ExternalChunkError::from)?;
148
149        self.writer.as_mut().map_or_else(
150            |w| {
151                w.write_u64::<byteorder::LittleEndian>(result.len() as u64)?;
152                w.write(&result)?;
153                Ok(())
154            },
155            |w| {
156                w.write_u64::<byteorder::LittleEndian>(result.len() as u64)?;
157                w.write(&result)?;
158                Ok(())
159            },
160        )
161    }
162
163    pub fn finish(self) -> Result<ExternalChunk<T>, ExternalChunkError> {
164        let reader: Result<Box<dyn Read>, ExternalChunkError> = self.writer.map_or_else(
165            |w| {
166                let mut file = w.into_inner().unwrap();
167                file.rewind()?;
168                let reader: Box<dyn Read> = Box::new(BufReader::new(file));
169                Ok(reader)
170            },
171            |w| {
172                let mut file = w.finish().0;
173                file.rewind()?;
174                let reader = Box::new(Decoder::new(file)?);
175                Ok(reader)
176            },
177        );
178
179        Ok(ExternalChunk {
180            reader: reader?,
181            item_type: PhantomData,
182        })
183    }
184}
185
186#[cfg(test)]
187mod test {
188    use rstest::*;
189
190    use super::ExternalChunk;
191
192    #[fixture]
193    fn tmp_dir() -> tempfile::TempDir {
194        tempfile::tempdir_in("./").unwrap()
195    }
196
197    #[rstest]
198    fn test_chunk(tmp_dir: tempfile::TempDir) {
199        let saved = Vec::from_iter(0..100);
200
201        let chunk: ExternalChunk<i32> = ExternalChunk::new(&tmp_dir, saved.clone(), None).unwrap();
202        let restored = chunk.collect::<Result<Vec<_>, _>>().unwrap();
203        assert_eq!(restored, saved);
204
205        let chunk: ExternalChunk<i32> =
206            ExternalChunk::new(&tmp_dir, saved.clone(), Some(3)).unwrap();
207        let restored = chunk.collect::<Result<Vec<_>, _>>().unwrap();
208        assert_eq!(restored, saved);
209    }
210}