Skip to main content

bed_utils/extsort/
chunk.rs

1//! External chunk.
2
3use bitcode::{DecodeOwned, Encode};
4use byteorder::{ReadBytesExt, WriteBytesExt};
5use lz4::{Decoder, Encoder, EncoderBuilder};
6use std::{
7    error::Error,
8    fmt::{self, Display},
9    fs::File,
10    io::{self, Read, Seek, Write},
11    marker::PhantomData,
12};
13
14/// External chunk error
15#[derive(Debug)]
16pub enum ExternalChunkError {
17    /// Common I/O error.
18    IO(io::Error),
19    /// Data serialization error.
20    EncodeError(bitcode::Error),
21}
22
23impl From<io::Error> for ExternalChunkError {
24    fn from(err: io::Error) -> Self {
25        ExternalChunkError::IO(err)
26    }
27}
28
29impl From<bitcode::Error> for ExternalChunkError {
30    fn from(err: bitcode::Error) -> Self {
31        ExternalChunkError::EncodeError(err)
32    }
33}
34
35impl Error for ExternalChunkError {}
36
37impl Display for ExternalChunkError {
38    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39        match self {
40            ExternalChunkError::IO(err) => write!(f, "{}", err),
41            ExternalChunkError::EncodeError(err) => write!(f, "{}", err),
42        }
43    }
44}
45
46/// External chunk interface. Provides methods for creating a chunk stored on file system and reading data from it.
47pub struct ExternalChunk<T> {
48    reader: Decoder<File>,
49    item_type: PhantomData<T>,
50}
51
52impl<T> ExternalChunk<T>
53where
54    T: Encode,
55{
56    /// Builds an instance of an external chunk creating file and dumping the items to it.
57    ///
58    /// # Arguments
59    /// * `dir` - Directory the chunk file is created in
60    /// * `items` - Items to be dumped to the chunk
61    /// * `buf_size` - File I/O buffer size
62    pub(crate) fn new(
63        file: File,
64        items: impl IntoIterator<Item = T>,
65        compression: u32,
66    ) -> Result<Self, ExternalChunkError> {
67        let mut builder = ExternalChunkBuilder::new(file, compression)?;
68        for item in items.into_iter() {
69            builder.add(item)?;
70        }
71        builder.finish()
72    }
73}
74
75impl<T> Iterator for ExternalChunk<T>
76where
77    T: DecodeOwned,
78{
79    type Item = Result<T, ExternalChunkError>;
80
81    fn next(&mut self) -> Option<Self::Item> {
82        match self.reader.read_u64::<byteorder::LittleEndian>() {
83            Err(err) => match err.kind() {
84                std::io::ErrorKind::UnexpectedEof => None,
85                _ => Some(Err(ExternalChunkError::IO(err))),
86            },
87            Ok(length) => {
88                let mut buf = vec![0u8; length as usize];
89                if let Err(err) = self.reader.read_exact(buf.as_mut()) {
90                    return Some(Err(ExternalChunkError::IO(err)));
91                } else {
92                    match bitcode::decode::<T>(&buf) {
93                        Err(err) => Some(Err(ExternalChunkError::from(err))),
94                        Ok(ser) => Some(Ok(ser)),
95                    }
96                }
97            }
98        }
99    }
100}
101
102pub struct ExternalChunkBuilder<T> {
103    writer: Encoder<File>,
104    item_type: PhantomData<T>,
105}
106
107impl<T: Encode> ExternalChunkBuilder<T> {
108    pub fn new(file: File, compression: u32) -> Result<Self, ExternalChunkError> {
109        let writer = EncoderBuilder::new().level(compression).build(file)?;
110        Ok(Self {
111            writer,
112            item_type: PhantomData,
113        })
114    }
115
116    pub fn add(&mut self, item: T) -> Result<(), ExternalChunkError> {
117        let result = bitcode::encode(&item);
118        self.writer.write_u64::<byteorder::LittleEndian>(result.len() as u64)?;
119        self.writer.write(&result)?;
120        Ok(())
121    }
122
123    pub fn finish(self) -> Result<ExternalChunk<T>, ExternalChunkError> {
124        let mut file = self.writer.finish().0;
125        file.rewind()?;
126        let reader = Decoder::new(file)?;
127
128        Ok(ExternalChunk {
129            reader,
130            item_type: PhantomData,
131        })
132    }
133}
134
135#[cfg(test)]
136mod test {
137    use rstest::*;
138
139    use super::ExternalChunk;
140
141    #[fixture]
142    fn tmp_dir() -> tempfile::TempDir {
143        tempfile::tempdir_in("./").unwrap()
144    }
145
146    #[rstest]
147    fn test_chunk(tmp_dir: tempfile::TempDir) {
148        let saved = Vec::from_iter(0..100);
149
150        let tmp_file = tempfile::tempfile_in(&tmp_dir).unwrap();
151        let chunk: ExternalChunk<i32> = ExternalChunk::new(tmp_file, saved.clone(), 0).unwrap();
152        let restored = chunk.collect::<Result<Vec<_>, _>>().unwrap();
153        assert_eq!(restored, saved);
154
155        let tmp_file = tempfile::tempfile_in(&tmp_dir).unwrap();
156        let chunk: ExternalChunk<i32> =
157            ExternalChunk::new(tmp_file, saved.clone(), 3).unwrap();
158        let restored = chunk.collect::<Result<Vec<_>, _>>().unwrap();
159        assert_eq!(restored, saved);
160    }
161}