Skip to main content

bed_utils/extsort/
chunk.rs

1//! External chunk.
2
3use bincode_next::{Decode, Encode};
4use byteorder::{ReadBytesExt, WriteBytesExt};
5use lz4::{Decoder, Encoder, EncoderBuilder};
6use std::{
7    error::Error,
8    fmt::{self, Display},
9    fs::File,
10    io::{self, Read, Seek, Write},
11    marker::PhantomData,
12};
13
14/// External chunk error
15#[derive(Debug)]
16pub enum ExternalChunkError {
17    /// Common I/O error.
18    IO(io::Error),
19    /// Data serialization error.
20    EncodeError(bincode_next::error::EncodeError),
21    DecodeError(bincode_next::error::DecodeError),
22}
23
24impl From<io::Error> for ExternalChunkError {
25    fn from(err: io::Error) -> Self {
26        ExternalChunkError::IO(err)
27    }
28}
29
30
31impl From<bincode_next::error::EncodeError> for ExternalChunkError {
32    fn from(err: bincode_next::error::EncodeError) -> Self {
33        ExternalChunkError::EncodeError(err)
34    }
35}
36
37impl From<bincode_next::error::DecodeError> for ExternalChunkError {
38    fn from(err: bincode_next::error::DecodeError) -> Self {
39        ExternalChunkError::DecodeError(err)
40    }
41}
42
43impl Error for ExternalChunkError {}
44
45impl Display for ExternalChunkError {
46    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47        match self {
48            ExternalChunkError::IO(err) => write!(f, "{}", err),
49            ExternalChunkError::EncodeError(err) => write!(f, "{}", err),
50            ExternalChunkError::DecodeError(err) => write!(f, "{}", err),
51        }
52    }
53}
54
55/// External chunk interface. Provides methods for creating a chunk stored on file system and reading data from it.
56pub struct ExternalChunk<T> {
57    reader: Decoder<File>,
58    item_type: PhantomData<T>,
59}
60
61impl<T> ExternalChunk<T>
62where
63    T: Encode,
64{
65    /// Builds an instance of an external chunk creating file and dumping the items to it.
66    ///
67    /// # Arguments
68    /// * `dir` - Directory the chunk file is created in
69    /// * `items` - Items to be dumped to the chunk
70    /// * `buf_size` - File I/O buffer size
71    pub(crate) fn new(
72        file: File,
73        items: impl IntoIterator<Item = T>,
74        compression: u32,
75    ) -> Result<Self, ExternalChunkError> {
76        let mut builder = ExternalChunkBuilder::new(file, compression)?;
77        for item in items.into_iter() {
78            builder.add(item)?;
79        }
80        builder.finish()
81    }
82
83    ///  Open an existing chunk from file.
84    pub fn open(file: File) -> Result<Self, ExternalChunkError> {
85        let reader = Decoder::new(file)?;
86        Ok(Self {
87            reader,
88            item_type: PhantomData,
89        })
90    }
91
92    pub fn into_inner(self) -> File {
93        self.reader.finish().0
94    }
95}
96
97impl<T> Iterator for ExternalChunk<T>
98where
99    T: Decode<()>,
100{
101    type Item = Result<T, ExternalChunkError>;
102
103    fn next(&mut self) -> Option<Self::Item> {
104        match self.reader.read_u64::<byteorder::LittleEndian>() {
105            Err(err) => match err.kind() {
106                std::io::ErrorKind::UnexpectedEof => None,
107                _ => Some(Err(ExternalChunkError::IO(err))),
108            },
109            Ok(length) => {
110                let config = bincode_next::config::standard();
111                let mut buf = vec![0u8; length as usize];
112                if let Err(err) = self.reader.read_exact(buf.as_mut()) {
113                    return Some(Err(ExternalChunkError::IO(err)));
114                } else {
115                    match bincode_next::decode_from_slice(&buf, config) {
116                        Err(err) => Some(Err(ExternalChunkError::from(err))),
117                        Ok((ser, n)) => {
118                            if n != length as usize {
119                                Some(Err(ExternalChunkError::IO(io::Error::new(
120                                    io::ErrorKind::InvalidData,
121                                    format!("Expected {} bytes, got {}", length, n),
122                                ))))
123                            } else {
124                                Some(Ok(ser))
125                            }
126                        }
127                    }
128                }
129            }
130        }
131    }
132}
133
134pub struct ExternalChunkBuilder<T> {
135    writer: Encoder<File>,
136    item_type: PhantomData<T>,
137}
138
139impl<T: Encode> ExternalChunkBuilder<T> {
140    pub fn new(file: File, compression: u32) -> Result<Self, ExternalChunkError> {
141        let writer = EncoderBuilder::new().level(compression).build(file)?;
142        Ok(Self {
143            writer,
144            item_type: PhantomData,
145        })
146    }
147
148    pub fn add(&mut self, item: T) -> Result<(), ExternalChunkError> {
149        let result = bincode_next::encode_to_vec(&item, bincode_next::config::standard())
150            .map_err(ExternalChunkError::from)?;
151
152        self.writer.write_u64::<byteorder::LittleEndian>(result.len() as u64)?;
153        self.writer.write(&result)?;
154        Ok(())
155    }
156
157    pub fn finish(self) -> Result<ExternalChunk<T>, ExternalChunkError> {
158        let mut file = self.writer.finish().0;
159        file.rewind()?;
160        let reader = Decoder::new(file)?;
161
162        Ok(ExternalChunk {
163            reader,
164            item_type: PhantomData,
165        })
166    }
167}
168
169#[cfg(test)]
170mod test {
171    use rstest::*;
172
173    use super::ExternalChunk;
174
175    #[fixture]
176    fn tmp_dir() -> tempfile::TempDir {
177        tempfile::tempdir_in("./").unwrap()
178    }
179
180    #[rstest]
181    fn test_chunk(tmp_dir: tempfile::TempDir) {
182        let saved = Vec::from_iter(0..100);
183
184        let tmp_file = tempfile::tempfile_in(&tmp_dir).unwrap();
185        let chunk: ExternalChunk<i32> = ExternalChunk::new(tmp_file, saved.clone(), 0).unwrap();
186        let restored = chunk.collect::<Result<Vec<_>, _>>().unwrap();
187        assert_eq!(restored, saved);
188
189        let tmp_file = tempfile::tempfile_in(&tmp_dir).unwrap();
190        let chunk: ExternalChunk<i32> =
191            ExternalChunk::new(tmp_file, saved.clone(), 3).unwrap();
192        let restored = chunk.collect::<Result<Vec<_>, _>>().unwrap();
193        assert_eq!(restored, saved);
194    }
195}