Skip to main content

bed_utils/extsort/
chunk.rs

1//! External chunk.
2
3use super::{DiskDeserializer, DiskSerializer};
4use byteorder::{ReadBytesExt, WriteBytesExt};
5use lz4::{Decoder, Encoder, EncoderBuilder};
6use rkyv::{Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
7use std::{
8    error::Error,
9    fmt::{self, Display},
10    fs::File,
11    io::{self, Read, Seek, Write},
12    marker::PhantomData,
13};
14
15/// External chunk error
16#[derive(Debug)]
17pub enum ExternalChunkError {
18    /// Common I/O error.
19    IO(io::Error),
20    /// Data serialization error.
21    EncodeError(rkyv::rancor::Error),
22    DecodeError(rkyv::rancor::Error),
23}
24
25impl From<io::Error> for ExternalChunkError {
26    fn from(err: io::Error) -> Self {
27        ExternalChunkError::IO(err)
28    }
29}
30
31impl From<rkyv::rancor::Error> for ExternalChunkError {
32    fn from(err: rkyv::rancor::Error) -> Self {
33        ExternalChunkError::EncodeError(err)
34    }
35}
36
37impl Error for ExternalChunkError {}
38
39impl Display for ExternalChunkError {
40    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41        match self {
42            ExternalChunkError::IO(err) => write!(f, "{}", err),
43            ExternalChunkError::EncodeError(err) => write!(f, "{}", err),
44            ExternalChunkError::DecodeError(err) => write!(f, "{}", err),
45        }
46    }
47}
48
49/// External chunk interface. Provides methods for creating a chunk stored on file system and reading data from it.
50pub struct ExternalChunk<T> {
51    reader: Decoder<File>,
52    read_buf: Vec<u8>,
53    item_type: PhantomData<T>,
54}
55
56impl<T> ExternalChunk<T>
57where
58    T: Archive + for<'a> RkyvSerialize<DiskSerializer<'a>>,
59    T::Archived: RkyvDeserialize<T, DiskDeserializer>,
60{
61    /// Builds an instance of an external chunk creating file and dumping the items to it.
62    ///
63    /// # Arguments
64    /// * `dir` - Directory the chunk file is created in
65    /// * `items` - Items to be dumped to the chunk
66    /// * `buf_size` - File I/O buffer size
67    pub(crate) fn new(
68        file: File,
69        items: impl IntoIterator<Item = T>,
70        compression: u32,
71    ) -> Result<Self, ExternalChunkError> {
72        let mut builder = ExternalChunkBuilder::new(file, compression)?;
73        for item in items.into_iter() {
74            builder.add(item)?;
75        }
76        builder.finish()
77    }
78
79    ///  Open an existing chunk from file.
80    pub fn open(file: File) -> Result<Self, ExternalChunkError> {
81        let reader = Decoder::new(file)?;
82        Ok(Self {
83            reader,
84            read_buf: Vec::new(),
85            item_type: PhantomData,
86        })
87    }
88
89    pub fn into_inner(self) -> File {
90        self.reader.finish().0
91    }
92}
93
94impl<T> Iterator for ExternalChunk<T>
95where
96    T: Archive + for<'a> RkyvSerialize<DiskSerializer<'a>>,
97    T::Archived: RkyvDeserialize<T, DiskDeserializer>,
98{
99    type Item = Result<T, ExternalChunkError>;
100
101    fn next(&mut self) -> Option<Self::Item> {
102        match self.reader.read_u64::<byteorder::LittleEndian>() {
103            Err(err) => match err.kind() {
104                std::io::ErrorKind::UnexpectedEof => None,
105                _ => Some(Err(ExternalChunkError::IO(err))),
106            },
107            Ok(length) => {
108                self.read_buf.resize(length as usize, 0);
109                if let Err(err) = self.reader.read_exact(&mut self.read_buf) {
110                    return Some(Err(ExternalChunkError::IO(err)));
111                } else {
112                    // Bytes are produced by rkyv in `add`, so unchecked read is valid here.
113                    match unsafe {
114                        rkyv::from_bytes_unchecked::<T, rkyv::rancor::Error>(&self.read_buf)
115                    } {
116                        Err(err) => Some(Err(ExternalChunkError::DecodeError(err))),
117                        Ok(ser) => Some(Ok(ser)),
118                    }
119                }
120            }
121        }
122    }
123}
124
125pub struct ExternalChunkBuilder<T> {
126    writer: Encoder<File>,
127    write_buf: rkyv::util::AlignedVec,
128    item_type: PhantomData<T>,
129}
130
131impl<T> ExternalChunkBuilder<T>
132where
133    T: Archive + for<'a> RkyvSerialize<DiskSerializer<'a>>,
134    T::Archived: RkyvDeserialize<T, DiskDeserializer>,
135{
136    pub fn new(file: File, compression: u32) -> Result<Self, ExternalChunkError> {
137        let writer = EncoderBuilder::new().level(compression).build(file)?;
138        Ok(Self {
139            writer,
140            write_buf: rkyv::util::AlignedVec::new(),
141            item_type: PhantomData,
142        })
143    }
144
145    pub fn add(&mut self, item: T) -> Result<(), ExternalChunkError> {
146        // Reuse the write buffer across calls to avoid a heap allocation per item.
147        let mut buf = std::mem::take(&mut self.write_buf);
148        buf.clear();
149        buf = rkyv::api::high::to_bytes_in::<_, rkyv::rancor::Error>(&item, buf)
150            .map_err(ExternalChunkError::EncodeError)?;
151        self.writer
152            .write_u64::<byteorder::LittleEndian>(buf.len() as u64)?;
153        self.writer.write_all(&buf)?;
154        self.write_buf = buf;
155        Ok(())
156    }
157
158    pub fn finish(self) -> Result<ExternalChunk<T>, ExternalChunkError> {
159        let mut file = self.writer.finish().0;
160        file.rewind()?;
161        let reader = Decoder::new(file)?;
162
163        Ok(ExternalChunk {
164            reader,
165            read_buf: Vec::new(),
166            item_type: PhantomData,
167        })
168    }
169}
170
171#[cfg(test)]
172mod test {
173    use rstest::*;
174
175    use super::ExternalChunk;
176
177    #[fixture]
178    fn tmp_dir() -> tempfile::TempDir {
179        tempfile::tempdir_in("./").unwrap()
180    }
181
182    #[rstest]
183    fn test_chunk(tmp_dir: tempfile::TempDir) {
184        let saved = Vec::from_iter(0..100);
185
186        let tmp_file = tempfile::tempfile_in(&tmp_dir).unwrap();
187        let chunk: ExternalChunk<i32> = ExternalChunk::new(tmp_file, saved.clone(), 0).unwrap();
188        let restored = chunk.collect::<Result<Vec<_>, _>>().unwrap();
189        assert_eq!(restored, saved);
190
191        let tmp_file = tempfile::tempfile_in(&tmp_dir).unwrap();
192        let chunk: ExternalChunk<i32> = ExternalChunk::new(tmp_file, saved.clone(), 3).unwrap();
193        let restored = chunk.collect::<Result<Vec<_>, _>>().unwrap();
194        assert_eq!(restored, saved);
195    }
196}