bed_utils/extsort/
chunk.rs1use bincode::{Decode, Encode};
4use lz4::{Decoder, EncoderBuilder};
5use std::error::Error;
6use std::fmt::{self, Display};
7use std::io::prelude::*;
8use std::io::{self, BufReader, BufWriter};
9use std::marker::PhantomData;
10
11use byteorder::{ReadBytesExt, WriteBytesExt};
12use tempfile;
13
14#[derive(Debug)]
16pub enum ExternalChunkError {
17 IO(io::Error),
19 EncodeError(bincode::error::EncodeError),
21 DecodeError(bincode::error::DecodeError),
22}
23
24impl From<io::Error> for ExternalChunkError {
25 fn from(err: io::Error) -> Self {
26 ExternalChunkError::IO(err)
27 }
28}
29
30impl From<bincode::error::EncodeError> for ExternalChunkError {
31 fn from(err: bincode::error::EncodeError) -> Self {
32 ExternalChunkError::EncodeError(err)
33 }
34}
35
36impl From<bincode::error::DecodeError> for ExternalChunkError {
37 fn from(err: bincode::error::DecodeError) -> Self {
38 ExternalChunkError::DecodeError(err)
39 }
40}
41
42impl Error for ExternalChunkError {}
43
44impl Display for ExternalChunkError {
45 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
46 match self {
47 ExternalChunkError::IO(err) => write!(f, "{}", err),
48 ExternalChunkError::EncodeError(err) => write!(f, "{}", err),
49 ExternalChunkError::DecodeError(err) => write!(f, "{}", err),
50 }
51 }
52}
53
54pub struct ExternalChunk<T> {
56 reader: Box<dyn Read>,
57 item_type: PhantomData<T>,
58}
59
60impl<T> ExternalChunk<T>
61where
62 T: Encode,
63{
64 pub(crate) fn new(
71 dir: &tempfile::TempDir,
72 items: impl IntoIterator<Item = T>,
73 compression: Option<u32>,
74 ) -> Result<Self, ExternalChunkError> {
75 let mut tmp_file = tempfile::tempfile_in(dir)?;
76
77 if let Some(level) = compression {
78 let mut writer = EncoderBuilder::new()
79 .level(level)
80 .build(tmp_file.try_clone()?)?;
81 dump(&mut writer, items)?;
82 writer.finish().1?;
83 } else {
84 let mut writer = BufWriter::new(tmp_file.try_clone()?);
85 dump(&mut writer, items)?;
86 writer.flush()?;
87 }
88
89 tmp_file.rewind()?;
90 if let Some(_) = compression {
91 let reader = Box::new(Decoder::new(tmp_file)?);
92 Ok(Self {
93 reader,
94 item_type: PhantomData,
95 })
96 } else {
97 let reader = Box::new(BufReader::new(tmp_file));
98 Ok(Self {
99 reader,
100 item_type: PhantomData,
101 })
102 }
103 }
104}
105
106fn dump<T: Encode, R: Write>(
107 chunk_writer: &mut R,
108 items: impl IntoIterator<Item = T>,
109) -> Result<(), ExternalChunkError> {
110 let config = bincode::config::standard();
111 for item in items.into_iter() {
112 let result = bincode::encode_to_vec(&item, config)
113 .map_err(ExternalChunkError::from)?;
114 chunk_writer.write_u64::<byteorder::LittleEndian>(result.len() as u64)?;
115 chunk_writer.write(&result)?;
116 }
117 return Ok(());
118}
119
120impl<T> Iterator for ExternalChunk<T>
121where
122 T: Decode<()>,
123{
124 type Item = Result<T, ExternalChunkError>;
125
126 fn next(&mut self) -> Option<Self::Item> {
127 match self.reader.read_u64::<byteorder::LittleEndian>() {
128 Err(err) => match err.kind() {
129 std::io::ErrorKind::UnexpectedEof => None,
130 _ => Some(Err(ExternalChunkError::IO(err))),
131 },
132 Ok(length) => {
133 let config = bincode::config::standard();
134 let mut buf = vec![0u8; length as usize];
135 if let Err(err) = self.reader.read_exact(buf.as_mut()) {
136 return Some(Err(ExternalChunkError::IO(err)));
137 } else {
138 match bincode::decode_from_slice(&buf, config) {
139 Err(err) => Some(Err(ExternalChunkError::from(err))),
140 Ok((ser, n)) => {
141 if n != length as usize {
142 Some(Err(ExternalChunkError::IO(
143 io::Error::new(
144 io::ErrorKind::InvalidData,
145 format!(
146 "Expected {} bytes, got {}",
147 length, n
148 ),
149 )
150 )))
151 } else {
152 Some(Ok(ser))
153 }
154 }
155 }
156 }
157 }
158 }
159 }
160}
161
162#[cfg(test)]
163mod test {
164 use rstest::*;
165
166 use super::ExternalChunk;
167
168 #[fixture]
169 fn tmp_dir() -> tempfile::TempDir {
170 tempfile::tempdir_in("./").unwrap()
171 }
172
173 #[rstest]
174 fn test_chunk(tmp_dir: tempfile::TempDir) {
175 let saved = Vec::from_iter(0..100);
176
177 let chunk: ExternalChunk<i32> = ExternalChunk::new(&tmp_dir, saved.clone(), None).unwrap();
178
179 let restored: Result<Vec<i32>, _> = chunk.collect();
180 let restored = restored.unwrap();
181
182 assert_eq!(restored, saved);
183 }
184}