bed_utils/extsort/
chunk.rs1use bincode::{Decode, Encode};
4use lz4::{Decoder, Encoder, EncoderBuilder};
5use std::error::Error;
6use std::fmt::{self, Display};
7use std::fs::File;
8use std::io::{self, BufWriter};
9use std::io::{prelude::*, BufReader};
10use std::marker::PhantomData;
11
12use byteorder::{ReadBytesExt, WriteBytesExt};
13use tempfile;
14
15#[derive(Debug)]
17pub enum ExternalChunkError {
18 IO(io::Error),
20 EncodeError(bincode::error::EncodeError),
22 DecodeError(bincode::error::DecodeError),
23}
24
25impl From<io::Error> for ExternalChunkError {
26 fn from(err: io::Error) -> Self {
27 ExternalChunkError::IO(err)
28 }
29}
30
31impl From<bincode::error::EncodeError> for ExternalChunkError {
32 fn from(err: bincode::error::EncodeError) -> Self {
33 ExternalChunkError::EncodeError(err)
34 }
35}
36
37impl From<bincode::error::DecodeError> for ExternalChunkError {
38 fn from(err: bincode::error::DecodeError) -> Self {
39 ExternalChunkError::DecodeError(err)
40 }
41}
42
43impl Error for ExternalChunkError {}
44
45impl Display for ExternalChunkError {
46 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47 match self {
48 ExternalChunkError::IO(err) => write!(f, "{}", err),
49 ExternalChunkError::EncodeError(err) => write!(f, "{}", err),
50 ExternalChunkError::DecodeError(err) => write!(f, "{}", err),
51 }
52 }
53}
54
55pub struct ExternalChunk<T> {
57 reader: Box<dyn Read>,
58 item_type: PhantomData<T>,
59}
60
61impl<T> ExternalChunk<T>
62where
63 T: Encode,
64{
65 pub(crate) fn new(
72 dir: &tempfile::TempDir,
73 items: impl IntoIterator<Item = T>,
74 compression: Option<u32>,
75 ) -> Result<Self, ExternalChunkError> {
76 let mut builder = ExternalChunkBuilder::new(dir, compression)?;
77 for item in items.into_iter() {
78 builder.add(item)?;
79 }
80 builder.finish()
81 }
82}
83
84impl<T> Iterator for ExternalChunk<T>
85where
86 T: Decode<()>,
87{
88 type Item = Result<T, ExternalChunkError>;
89
90 fn next(&mut self) -> Option<Self::Item> {
91 match self.reader.read_u64::<byteorder::LittleEndian>() {
92 Err(err) => match err.kind() {
93 std::io::ErrorKind::UnexpectedEof => None,
94 _ => Some(Err(ExternalChunkError::IO(err))),
95 },
96 Ok(length) => {
97 let config = bincode::config::standard();
98 let mut buf = vec![0u8; length as usize];
99 if let Err(err) = self.reader.read_exact(buf.as_mut()) {
100 return Some(Err(ExternalChunkError::IO(err)));
101 } else {
102 match bincode::decode_from_slice(&buf, config) {
103 Err(err) => Some(Err(ExternalChunkError::from(err))),
104 Ok((ser, n)) => {
105 if n != length as usize {
106 Some(Err(ExternalChunkError::IO(io::Error::new(
107 io::ErrorKind::InvalidData,
108 format!("Expected {} bytes, got {}", length, n),
109 ))))
110 } else {
111 Some(Ok(ser))
112 }
113 }
114 }
115 }
116 }
117 }
118 }
119}
120
121pub struct ExternalChunkBuilder<T> {
122 writer: Result<Encoder<File>, BufWriter<File>>,
123 item_type: PhantomData<T>,
124}
125
126impl<T: Encode> ExternalChunkBuilder<T> {
127 pub fn new(
128 dir: &tempfile::TempDir,
129 compression: Option<u32>,
130 ) -> Result<Self, ExternalChunkError> {
131 let tmp_file = tempfile::tempfile_in(dir)?;
132
133 let writer = if let Some(lvl) = compression {
134 Ok(EncoderBuilder::new().level(lvl).build(tmp_file)?)
135 } else {
136 Err(BufWriter::new(tmp_file))
137 };
138
139 Ok(Self {
140 writer,
141 item_type: PhantomData,
142 })
143 }
144
145 pub fn add(&mut self, item: T) -> Result<(), ExternalChunkError> {
146 let result = bincode::encode_to_vec(&item, bincode::config::standard())
147 .map_err(ExternalChunkError::from)?;
148
149 self.writer.as_mut().map_or_else(
150 |w| {
151 w.write_u64::<byteorder::LittleEndian>(result.len() as u64)?;
152 w.write(&result)?;
153 Ok(())
154 },
155 |w| {
156 w.write_u64::<byteorder::LittleEndian>(result.len() as u64)?;
157 w.write(&result)?;
158 Ok(())
159 },
160 )
161 }
162
163 pub fn finish(self) -> Result<ExternalChunk<T>, ExternalChunkError> {
164 let reader: Result<Box<dyn Read>, ExternalChunkError> = self.writer.map_or_else(
165 |w| {
166 let mut file = w.into_inner().unwrap();
167 file.rewind()?;
168 let reader: Box<dyn Read> = Box::new(BufReader::new(file));
169 Ok(reader)
170 },
171 |w| {
172 let mut file = w.finish().0;
173 file.rewind()?;
174 let reader = Box::new(Decoder::new(file)?);
175 Ok(reader)
176 },
177 );
178
179 Ok(ExternalChunk {
180 reader: reader?,
181 item_type: PhantomData,
182 })
183 }
184}
185
186#[cfg(test)]
187mod test {
188 use rstest::*;
189
190 use super::ExternalChunk;
191
192 #[fixture]
193 fn tmp_dir() -> tempfile::TempDir {
194 tempfile::tempdir_in("./").unwrap()
195 }
196
197 #[rstest]
198 fn test_chunk(tmp_dir: tempfile::TempDir) {
199 let saved = Vec::from_iter(0..100);
200
201 let chunk: ExternalChunk<i32> = ExternalChunk::new(&tmp_dir, saved.clone(), None).unwrap();
202 let restored = chunk.collect::<Result<Vec<_>, _>>().unwrap();
203 assert_eq!(restored, saved);
204
205 let chunk: ExternalChunk<i32> =
206 ExternalChunk::new(&tmp_dir, saved.clone(), Some(3)).unwrap();
207 let restored = chunk.collect::<Result<Vec<_>, _>>().unwrap();
208 assert_eq!(restored, saved);
209 }
210}