bed_utils/extsort/
chunk.rs

1//! External chunk.
2
3use bincode::{Decode, Encode};
4use lz4::{Decoder, EncoderBuilder};
5use std::error::Error;
6use std::fmt::{self, Display};
7use std::io::prelude::*;
8use std::io::{self, BufReader, BufWriter};
9use std::marker::PhantomData;
10
11use byteorder::{ReadBytesExt, WriteBytesExt};
12use tempfile;
13
14/// External chunk error
15#[derive(Debug)]
16pub enum ExternalChunkError {
17    /// Common I/O error.
18    IO(io::Error),
19    /// Data serialization error.
20    EncodeError(bincode::error::EncodeError),
21    DecodeError(bincode::error::DecodeError),
22}
23
24impl From<io::Error> for ExternalChunkError {
25    fn from(err: io::Error) -> Self {
26        ExternalChunkError::IO(err)
27    }
28}
29
30impl From<bincode::error::EncodeError> for ExternalChunkError {
31    fn from(err: bincode::error::EncodeError) -> Self {
32        ExternalChunkError::EncodeError(err)
33    }
34}
35
36impl From<bincode::error::DecodeError> for ExternalChunkError {
37    fn from(err: bincode::error::DecodeError) -> Self {
38        ExternalChunkError::DecodeError(err)
39    }
40}
41
42impl Error for ExternalChunkError {}
43
44impl Display for ExternalChunkError {
45    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
46        match self {
47            ExternalChunkError::IO(err) => write!(f, "{}", err),
48            ExternalChunkError::EncodeError(err) => write!(f, "{}", err),
49            ExternalChunkError::DecodeError(err) => write!(f, "{}", err),
50        }
51    }
52}
53
54/// External chunk interface. Provides methods for creating a chunk stored on file system and reading data from it.
55pub struct ExternalChunk<T> {
56    reader: Box<dyn Read>,
57    item_type: PhantomData<T>,
58}
59
60impl<T> ExternalChunk<T>
61where
62    T: Encode,
63{
64    /// Builds an instance of an external chunk creating file and dumping the items to it.
65    ///
66    /// # Arguments
67    /// * `dir` - Directory the chunk file is created in
68    /// * `items` - Items to be dumped to the chunk
69    /// * `buf_size` - File I/O buffer size
70    pub(crate) fn new(
71        dir: &tempfile::TempDir,
72        items: impl IntoIterator<Item = T>,
73        compression: Option<u32>,
74    ) -> Result<Self, ExternalChunkError> {
75        let mut tmp_file = tempfile::tempfile_in(dir)?;
76
77        if let Some(level) = compression {
78            let mut writer = EncoderBuilder::new()
79                .level(level)
80                .build(tmp_file.try_clone()?)?;
81            dump(&mut writer, items)?;
82            writer.finish().1?;
83        } else {
84            let mut writer = BufWriter::new(tmp_file.try_clone()?);
85            dump(&mut writer, items)?;
86            writer.flush()?;
87        }
88
89        tmp_file.rewind()?;
90        if let Some(_) = compression {
91            let reader = Box::new(Decoder::new(tmp_file)?);
92            Ok(Self {
93                reader,
94                item_type: PhantomData,
95            })
96        } else {
97            let reader = Box::new(BufReader::new(tmp_file));
98            Ok(Self {
99                reader,
100                item_type: PhantomData,
101            })
102        }
103    }
104}
105
106fn dump<T: Encode, R: Write>(
107    chunk_writer: &mut R,
108    items: impl IntoIterator<Item = T>,
109) -> Result<(), ExternalChunkError> {
110    let config = bincode::config::standard();
111    for item in items.into_iter() {
112        let result = bincode::encode_to_vec(&item, config)
113            .map_err(ExternalChunkError::from)?;
114        chunk_writer.write_u64::<byteorder::LittleEndian>(result.len() as u64)?;
115        chunk_writer.write(&result)?;
116    }
117    return Ok(());
118}
119
120impl<T> Iterator for ExternalChunk<T>
121where
122    T: Decode<()>,
123{
124    type Item = Result<T, ExternalChunkError>;
125
126    fn next(&mut self) -> Option<Self::Item> {
127        match self.reader.read_u64::<byteorder::LittleEndian>() {
128            Err(err) => match err.kind() {
129                std::io::ErrorKind::UnexpectedEof => None,
130                _ => Some(Err(ExternalChunkError::IO(err))),
131            },
132            Ok(length) => {
133                let config = bincode::config::standard();
134                let mut buf = vec![0u8; length as usize];
135                if let Err(err) = self.reader.read_exact(buf.as_mut()) {
136                    return Some(Err(ExternalChunkError::IO(err)));
137                } else {
138                    match bincode::decode_from_slice(&buf, config) {
139                        Err(err) => Some(Err(ExternalChunkError::from(err))),
140                        Ok((ser, n)) => {
141                            if n != length as usize {
142                                Some(Err(ExternalChunkError::IO(
143                                    io::Error::new(
144                                        io::ErrorKind::InvalidData,
145                                        format!(
146                                            "Expected {} bytes, got {}",
147                                            length, n
148                                        ),
149                                    )
150                                )))
151                            } else {
152                                Some(Ok(ser))
153                            }
154                        }
155                    }
156                }
157            }
158        }
159    }
160}
161
162#[cfg(test)]
163mod test {
164    use rstest::*;
165
166    use super::ExternalChunk;
167
168    #[fixture]
169    fn tmp_dir() -> tempfile::TempDir {
170        tempfile::tempdir_in("./").unwrap()
171    }
172
173    #[rstest]
174    fn test_chunk(tmp_dir: tempfile::TempDir) {
175        let saved = Vec::from_iter(0..100);
176
177        let chunk: ExternalChunk<i32> = ExternalChunk::new(&tmp_dir, saved.clone(), None).unwrap();
178
179        let restored: Result<Vec<i32>, _> = chunk.collect();
180        let restored = restored.unwrap();
181
182        assert_eq!(restored, saved);
183    }
184}