bed_utils/extsort/
chunk.rs1use bitcode::{DecodeOwned, Encode};
4use byteorder::{ReadBytesExt, WriteBytesExt};
5use lz4::{Decoder, Encoder, EncoderBuilder};
6use std::{
7 error::Error,
8 fmt::{self, Display},
9 fs::File,
10 io::{self, Read, Seek, Write},
11 marker::PhantomData,
12};
13
14#[derive(Debug)]
16pub enum ExternalChunkError {
17 IO(io::Error),
19 EncodeError(bitcode::Error),
21}
22
23impl From<io::Error> for ExternalChunkError {
24 fn from(err: io::Error) -> Self {
25 ExternalChunkError::IO(err)
26 }
27}
28
29impl From<bitcode::Error> for ExternalChunkError {
30 fn from(err: bitcode::Error) -> Self {
31 ExternalChunkError::EncodeError(err)
32 }
33}
34
35impl Error for ExternalChunkError {}
36
37impl Display for ExternalChunkError {
38 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39 match self {
40 ExternalChunkError::IO(err) => write!(f, "{}", err),
41 ExternalChunkError::EncodeError(err) => write!(f, "{}", err),
42 }
43 }
44}
45
46pub struct ExternalChunk<T> {
48 reader: Decoder<File>,
49 item_type: PhantomData<T>,
50}
51
52impl<T> ExternalChunk<T>
53where
54 T: Encode,
55{
56 pub(crate) fn new(
63 file: File,
64 items: impl IntoIterator<Item = T>,
65 compression: u32,
66 ) -> Result<Self, ExternalChunkError> {
67 let mut builder = ExternalChunkBuilder::new(file, compression)?;
68 for item in items.into_iter() {
69 builder.add(item)?;
70 }
71 builder.finish()
72 }
73
74 pub fn open(file: File) -> Result<Self, ExternalChunkError> {
76 let reader = Decoder::new(file)?;
77 Ok(Self {
78 reader,
79 item_type: PhantomData,
80 })
81 }
82
83 pub fn into_inner(self) -> File {
84 self.reader.finish().0
85 }
86}
87
88impl<T> Iterator for ExternalChunk<T>
89where
90 T: DecodeOwned,
91{
92 type Item = Result<T, ExternalChunkError>;
93
94 fn next(&mut self) -> Option<Self::Item> {
95 match self.reader.read_u64::<byteorder::LittleEndian>() {
96 Err(err) => match err.kind() {
97 std::io::ErrorKind::UnexpectedEof => None,
98 _ => Some(Err(ExternalChunkError::IO(err))),
99 },
100 Ok(length) => {
101 let mut buf = vec![0u8; length as usize];
102 if let Err(err) = self.reader.read_exact(buf.as_mut()) {
103 return Some(Err(ExternalChunkError::IO(err)));
104 } else {
105 match bitcode::decode::<T>(&buf) {
106 Err(err) => Some(Err(ExternalChunkError::from(err))),
107 Ok(ser) => Some(Ok(ser)),
108 }
109 }
110 }
111 }
112 }
113}
114
115pub struct ExternalChunkBuilder<T> {
116 writer: Encoder<File>,
117 item_type: PhantomData<T>,
118}
119
120impl<T: Encode> ExternalChunkBuilder<T> {
121 pub fn new(file: File, compression: u32) -> Result<Self, ExternalChunkError> {
122 let writer = EncoderBuilder::new().level(compression).build(file)?;
123 Ok(Self {
124 writer,
125 item_type: PhantomData,
126 })
127 }
128
129 pub fn add(&mut self, item: T) -> Result<(), ExternalChunkError> {
130 let result = bitcode::encode(&item);
131 self.writer.write_u64::<byteorder::LittleEndian>(result.len() as u64)?;
132 self.writer.write(&result)?;
133 Ok(())
134 }
135
136 pub fn finish(self) -> Result<ExternalChunk<T>, ExternalChunkError> {
137 let mut file = self.writer.finish().0;
138 file.rewind()?;
139 let reader = Decoder::new(file)?;
140
141 Ok(ExternalChunk {
142 reader,
143 item_type: PhantomData,
144 })
145 }
146}
147
148#[cfg(test)]
149mod test {
150 use rstest::*;
151
152 use super::ExternalChunk;
153
154 #[fixture]
155 fn tmp_dir() -> tempfile::TempDir {
156 tempfile::tempdir_in("./").unwrap()
157 }
158
159 #[rstest]
160 fn test_chunk(tmp_dir: tempfile::TempDir) {
161 let saved = Vec::from_iter(0..100);
162
163 let tmp_file = tempfile::tempfile_in(&tmp_dir).unwrap();
164 let chunk: ExternalChunk<i32> = ExternalChunk::new(tmp_file, saved.clone(), 0).unwrap();
165 let restored = chunk.collect::<Result<Vec<_>, _>>().unwrap();
166 assert_eq!(restored, saved);
167
168 let tmp_file = tempfile::tempfile_in(&tmp_dir).unwrap();
169 let chunk: ExternalChunk<i32> =
170 ExternalChunk::new(tmp_file, saved.clone(), 3).unwrap();
171 let restored = chunk.collect::<Result<Vec<_>, _>>().unwrap();
172 assert_eq!(restored, saved);
173 }
174}