bed_utils/extsort/
chunk.rs1use bitcode::{DecodeOwned, Encode};
4use byteorder::{ReadBytesExt, WriteBytesExt};
5use lz4::{Decoder, Encoder, EncoderBuilder};
6use std::{
7 error::Error,
8 fmt::{self, Display},
9 fs::File,
10 io::{self, Read, Seek, Write},
11 marker::PhantomData,
12};
13
14#[derive(Debug)]
16pub enum ExternalChunkError {
17 IO(io::Error),
19 EncodeError(bitcode::Error),
21}
22
23impl From<io::Error> for ExternalChunkError {
24 fn from(err: io::Error) -> Self {
25 ExternalChunkError::IO(err)
26 }
27}
28
29impl From<bitcode::Error> for ExternalChunkError {
30 fn from(err: bitcode::Error) -> Self {
31 ExternalChunkError::EncodeError(err)
32 }
33}
34
35impl Error for ExternalChunkError {}
36
37impl Display for ExternalChunkError {
38 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39 match self {
40 ExternalChunkError::IO(err) => write!(f, "{}", err),
41 ExternalChunkError::EncodeError(err) => write!(f, "{}", err),
42 }
43 }
44}
45
46pub struct ExternalChunk<T> {
48 reader: Decoder<File>,
49 item_type: PhantomData<T>,
50}
51
52impl<T> ExternalChunk<T>
53where
54 T: Encode,
55{
56 pub(crate) fn new(
63 file: File,
64 items: impl IntoIterator<Item = T>,
65 compression: u32,
66 ) -> Result<Self, ExternalChunkError> {
67 let mut builder = ExternalChunkBuilder::new(file, compression)?;
68 for item in items.into_iter() {
69 builder.add(item)?;
70 }
71 builder.finish()
72 }
73}
74
75impl<T> Iterator for ExternalChunk<T>
76where
77 T: DecodeOwned,
78{
79 type Item = Result<T, ExternalChunkError>;
80
81 fn next(&mut self) -> Option<Self::Item> {
82 match self.reader.read_u64::<byteorder::LittleEndian>() {
83 Err(err) => match err.kind() {
84 std::io::ErrorKind::UnexpectedEof => None,
85 _ => Some(Err(ExternalChunkError::IO(err))),
86 },
87 Ok(length) => {
88 let mut buf = vec![0u8; length as usize];
89 if let Err(err) = self.reader.read_exact(buf.as_mut()) {
90 return Some(Err(ExternalChunkError::IO(err)));
91 } else {
92 match bitcode::decode::<T>(&buf) {
93 Err(err) => Some(Err(ExternalChunkError::from(err))),
94 Ok(ser) => Some(Ok(ser)),
95 }
96 }
97 }
98 }
99 }
100}
101
102pub struct ExternalChunkBuilder<T> {
103 writer: Encoder<File>,
104 item_type: PhantomData<T>,
105}
106
107impl<T: Encode> ExternalChunkBuilder<T> {
108 pub fn new(file: File, compression: u32) -> Result<Self, ExternalChunkError> {
109 let writer = EncoderBuilder::new().level(compression).build(file)?;
110 Ok(Self {
111 writer,
112 item_type: PhantomData,
113 })
114 }
115
116 pub fn add(&mut self, item: T) -> Result<(), ExternalChunkError> {
117 let result = bitcode::encode(&item);
118 self.writer.write_u64::<byteorder::LittleEndian>(result.len() as u64)?;
119 self.writer.write(&result)?;
120 Ok(())
121 }
122
123 pub fn finish(self) -> Result<ExternalChunk<T>, ExternalChunkError> {
124 let mut file = self.writer.finish().0;
125 file.rewind()?;
126 let reader = Decoder::new(file)?;
127
128 Ok(ExternalChunk {
129 reader,
130 item_type: PhantomData,
131 })
132 }
133}
134
135#[cfg(test)]
136mod test {
137 use rstest::*;
138
139 use super::ExternalChunk;
140
141 #[fixture]
142 fn tmp_dir() -> tempfile::TempDir {
143 tempfile::tempdir_in("./").unwrap()
144 }
145
146 #[rstest]
147 fn test_chunk(tmp_dir: tempfile::TempDir) {
148 let saved = Vec::from_iter(0..100);
149
150 let tmp_file = tempfile::tempfile_in(&tmp_dir).unwrap();
151 let chunk: ExternalChunk<i32> = ExternalChunk::new(tmp_file, saved.clone(), 0).unwrap();
152 let restored = chunk.collect::<Result<Vec<_>, _>>().unwrap();
153 assert_eq!(restored, saved);
154
155 let tmp_file = tempfile::tempfile_in(&tmp_dir).unwrap();
156 let chunk: ExternalChunk<i32> =
157 ExternalChunk::new(tmp_file, saved.clone(), 3).unwrap();
158 let restored = chunk.collect::<Result<Vec<_>, _>>().unwrap();
159 assert_eq!(restored, saved);
160 }
161}