Skip to main content

bed_utils/extsort/
chunk.rs

1//! External chunk.
2
3use bitcode::{DecodeOwned, Encode};
4use byteorder::{ReadBytesExt, WriteBytesExt};
5use lz4::{Decoder, Encoder, EncoderBuilder};
6use std::{
7    error::Error,
8    fmt::{self, Display},
9    fs::File,
10    io::{self, Read, Seek, Write},
11    marker::PhantomData,
12};
13
14/// External chunk error
15#[derive(Debug)]
16pub enum ExternalChunkError {
17    /// Common I/O error.
18    IO(io::Error),
19    /// Data serialization error.
20    EncodeError(bitcode::Error),
21}
22
23impl From<io::Error> for ExternalChunkError {
24    fn from(err: io::Error) -> Self {
25        ExternalChunkError::IO(err)
26    }
27}
28
29impl From<bitcode::Error> for ExternalChunkError {
30    fn from(err: bitcode::Error) -> Self {
31        ExternalChunkError::EncodeError(err)
32    }
33}
34
35impl Error for ExternalChunkError {}
36
37impl Display for ExternalChunkError {
38    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39        match self {
40            ExternalChunkError::IO(err) => write!(f, "{}", err),
41            ExternalChunkError::EncodeError(err) => write!(f, "{}", err),
42        }
43    }
44}
45
46/// External chunk interface. Provides methods for creating a chunk stored on file system and reading data from it.
47pub struct ExternalChunk<T> {
48    reader: Decoder<File>,
49    item_type: PhantomData<T>,
50}
51
52impl<T> ExternalChunk<T>
53where
54    T: Encode,
55{
56    /// Builds an instance of an external chunk creating file and dumping the items to it.
57    ///
58    /// # Arguments
59    /// * `dir` - Directory the chunk file is created in
60    /// * `items` - Items to be dumped to the chunk
61    /// * `buf_size` - File I/O buffer size
62    pub(crate) fn new(
63        file: File,
64        items: impl IntoIterator<Item = T>,
65        compression: u32,
66    ) -> Result<Self, ExternalChunkError> {
67        let mut builder = ExternalChunkBuilder::new(file, compression)?;
68        for item in items.into_iter() {
69            builder.add(item)?;
70        }
71        builder.finish()
72    }
73
74    ///  Open an existing chunk from file.
75    pub fn open(file: File) -> Result<Self, ExternalChunkError> {
76        let reader = Decoder::new(file)?;
77        Ok(Self {
78            reader,
79            item_type: PhantomData,
80        })
81    }
82
83    pub fn into_inner(self) -> File {
84        self.reader.finish().0
85    }
86}
87
88impl<T> Iterator for ExternalChunk<T>
89where
90    T: DecodeOwned,
91{
92    type Item = Result<T, ExternalChunkError>;
93
94    fn next(&mut self) -> Option<Self::Item> {
95        match self.reader.read_u64::<byteorder::LittleEndian>() {
96            Err(err) => match err.kind() {
97                std::io::ErrorKind::UnexpectedEof => None,
98                _ => Some(Err(ExternalChunkError::IO(err))),
99            },
100            Ok(length) => {
101                let mut buf = vec![0u8; length as usize];
102                if let Err(err) = self.reader.read_exact(buf.as_mut()) {
103                    return Some(Err(ExternalChunkError::IO(err)));
104                } else {
105                    match bitcode::decode::<T>(&buf) {
106                        Err(err) => Some(Err(ExternalChunkError::from(err))),
107                        Ok(ser) => Some(Ok(ser)),
108                    }
109                }
110            }
111        }
112    }
113}
114
115pub struct ExternalChunkBuilder<T> {
116    writer: Encoder<File>,
117    item_type: PhantomData<T>,
118}
119
120impl<T: Encode> ExternalChunkBuilder<T> {
121    pub fn new(file: File, compression: u32) -> Result<Self, ExternalChunkError> {
122        let writer = EncoderBuilder::new().level(compression).build(file)?;
123        Ok(Self {
124            writer,
125            item_type: PhantomData,
126        })
127    }
128
129    pub fn add(&mut self, item: T) -> Result<(), ExternalChunkError> {
130        let result = bitcode::encode(&item);
131        self.writer.write_u64::<byteorder::LittleEndian>(result.len() as u64)?;
132        self.writer.write(&result)?;
133        Ok(())
134    }
135
136    pub fn finish(self) -> Result<ExternalChunk<T>, ExternalChunkError> {
137        let mut file = self.writer.finish().0;
138        file.rewind()?;
139        let reader = Decoder::new(file)?;
140
141        Ok(ExternalChunk {
142            reader,
143            item_type: PhantomData,
144        })
145    }
146}
147
148#[cfg(test)]
149mod test {
150    use rstest::*;
151
152    use super::ExternalChunk;
153
154    #[fixture]
155    fn tmp_dir() -> tempfile::TempDir {
156        tempfile::tempdir_in("./").unwrap()
157    }
158
159    #[rstest]
160    fn test_chunk(tmp_dir: tempfile::TempDir) {
161        let saved = Vec::from_iter(0..100);
162
163        let tmp_file = tempfile::tempfile_in(&tmp_dir).unwrap();
164        let chunk: ExternalChunk<i32> = ExternalChunk::new(tmp_file, saved.clone(), 0).unwrap();
165        let restored = chunk.collect::<Result<Vec<_>, _>>().unwrap();
166        assert_eq!(restored, saved);
167
168        let tmp_file = tempfile::tempfile_in(&tmp_dir).unwrap();
169        let chunk: ExternalChunk<i32> =
170            ExternalChunk::new(tmp_file, saved.clone(), 3).unwrap();
171        let restored = chunk.collect::<Result<Vec<_>, _>>().unwrap();
172        assert_eq!(restored, saved);
173    }
174}