bed_utils/extsort/
chunk.rs1use bincode_next::{Decode, Encode};
4use byteorder::{ReadBytesExt, WriteBytesExt};
5use lz4::{Decoder, Encoder, EncoderBuilder};
6use std::{
7 error::Error,
8 fmt::{self, Display},
9 fs::File,
10 io::{self, Read, Seek, Write},
11 marker::PhantomData,
12};
13
14#[derive(Debug)]
16pub enum ExternalChunkError {
17 IO(io::Error),
19 EncodeError(bincode_next::error::EncodeError),
21 DecodeError(bincode_next::error::DecodeError),
22}
23
24impl From<io::Error> for ExternalChunkError {
25 fn from(err: io::Error) -> Self {
26 ExternalChunkError::IO(err)
27 }
28}
29
30
31impl From<bincode_next::error::EncodeError> for ExternalChunkError {
32 fn from(err: bincode_next::error::EncodeError) -> Self {
33 ExternalChunkError::EncodeError(err)
34 }
35}
36
37impl From<bincode_next::error::DecodeError> for ExternalChunkError {
38 fn from(err: bincode_next::error::DecodeError) -> Self {
39 ExternalChunkError::DecodeError(err)
40 }
41}
42
43impl Error for ExternalChunkError {}
44
45impl Display for ExternalChunkError {
46 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47 match self {
48 ExternalChunkError::IO(err) => write!(f, "{}", err),
49 ExternalChunkError::EncodeError(err) => write!(f, "{}", err),
50 ExternalChunkError::DecodeError(err) => write!(f, "{}", err),
51 }
52 }
53}
54
55pub struct ExternalChunk<T> {
57 reader: Decoder<File>,
58 item_type: PhantomData<T>,
59}
60
61impl<T> ExternalChunk<T>
62where
63 T: Encode,
64{
65 pub(crate) fn new(
72 file: File,
73 items: impl IntoIterator<Item = T>,
74 compression: u32,
75 ) -> Result<Self, ExternalChunkError> {
76 let mut builder = ExternalChunkBuilder::new(file, compression)?;
77 for item in items.into_iter() {
78 builder.add(item)?;
79 }
80 builder.finish()
81 }
82
83 pub fn open(file: File) -> Result<Self, ExternalChunkError> {
85 let reader = Decoder::new(file)?;
86 Ok(Self {
87 reader,
88 item_type: PhantomData,
89 })
90 }
91
92 pub fn into_inner(self) -> File {
93 self.reader.finish().0
94 }
95}
96
97impl<T> Iterator for ExternalChunk<T>
98where
99 T: Decode<()>,
100{
101 type Item = Result<T, ExternalChunkError>;
102
103 fn next(&mut self) -> Option<Self::Item> {
104 match self.reader.read_u64::<byteorder::LittleEndian>() {
105 Err(err) => match err.kind() {
106 std::io::ErrorKind::UnexpectedEof => None,
107 _ => Some(Err(ExternalChunkError::IO(err))),
108 },
109 Ok(length) => {
110 let config = bincode_next::config::standard();
111 let mut buf = vec![0u8; length as usize];
112 if let Err(err) = self.reader.read_exact(buf.as_mut()) {
113 return Some(Err(ExternalChunkError::IO(err)));
114 } else {
115 match bincode_next::decode_from_slice(&buf, config) {
116 Err(err) => Some(Err(ExternalChunkError::from(err))),
117 Ok((ser, n)) => {
118 if n != length as usize {
119 Some(Err(ExternalChunkError::IO(io::Error::new(
120 io::ErrorKind::InvalidData,
121 format!("Expected {} bytes, got {}", length, n),
122 ))))
123 } else {
124 Some(Ok(ser))
125 }
126 }
127 }
128 }
129 }
130 }
131 }
132}
133
134pub struct ExternalChunkBuilder<T> {
135 writer: Encoder<File>,
136 item_type: PhantomData<T>,
137}
138
139impl<T: Encode> ExternalChunkBuilder<T> {
140 pub fn new(file: File, compression: u32) -> Result<Self, ExternalChunkError> {
141 let writer = EncoderBuilder::new().level(compression).build(file)?;
142 Ok(Self {
143 writer,
144 item_type: PhantomData,
145 })
146 }
147
148 pub fn add(&mut self, item: T) -> Result<(), ExternalChunkError> {
149 let result = bincode_next::encode_to_vec(&item, bincode_next::config::standard())
150 .map_err(ExternalChunkError::from)?;
151
152 self.writer.write_u64::<byteorder::LittleEndian>(result.len() as u64)?;
153 self.writer.write(&result)?;
154 Ok(())
155 }
156
157 pub fn finish(self) -> Result<ExternalChunk<T>, ExternalChunkError> {
158 let mut file = self.writer.finish().0;
159 file.rewind()?;
160 let reader = Decoder::new(file)?;
161
162 Ok(ExternalChunk {
163 reader,
164 item_type: PhantomData,
165 })
166 }
167}
168
169#[cfg(test)]
170mod test {
171 use rstest::*;
172
173 use super::ExternalChunk;
174
175 #[fixture]
176 fn tmp_dir() -> tempfile::TempDir {
177 tempfile::tempdir_in("./").unwrap()
178 }
179
180 #[rstest]
181 fn test_chunk(tmp_dir: tempfile::TempDir) {
182 let saved = Vec::from_iter(0..100);
183
184 let tmp_file = tempfile::tempfile_in(&tmp_dir).unwrap();
185 let chunk: ExternalChunk<i32> = ExternalChunk::new(tmp_file, saved.clone(), 0).unwrap();
186 let restored = chunk.collect::<Result<Vec<_>, _>>().unwrap();
187 assert_eq!(restored, saved);
188
189 let tmp_file = tempfile::tempfile_in(&tmp_dir).unwrap();
190 let chunk: ExternalChunk<i32> =
191 ExternalChunk::new(tmp_file, saved.clone(), 3).unwrap();
192 let restored = chunk.collect::<Result<Vec<_>, _>>().unwrap();
193 assert_eq!(restored, saved);
194 }
195}