bed_utils/extsort/
chunk.rs1use super::{DiskDeserializer, DiskSerializer};
4use byteorder::{ReadBytesExt, WriteBytesExt};
5use lz4::{Decoder, Encoder, EncoderBuilder};
6use rkyv::{Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
7use std::{
8 error::Error,
9 fmt::{self, Display},
10 fs::File,
11 io::{self, Read, Seek, Write},
12 marker::PhantomData,
13};
14
15#[derive(Debug)]
17pub enum ExternalChunkError {
18 IO(io::Error),
20 EncodeError(rkyv::rancor::Error),
22 DecodeError(rkyv::rancor::Error),
23}
24
25impl From<io::Error> for ExternalChunkError {
26 fn from(err: io::Error) -> Self {
27 ExternalChunkError::IO(err)
28 }
29}
30
31impl From<rkyv::rancor::Error> for ExternalChunkError {
32 fn from(err: rkyv::rancor::Error) -> Self {
33 ExternalChunkError::EncodeError(err)
34 }
35}
36
37impl Error for ExternalChunkError {}
38
39impl Display for ExternalChunkError {
40 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41 match self {
42 ExternalChunkError::IO(err) => write!(f, "{}", err),
43 ExternalChunkError::EncodeError(err) => write!(f, "{}", err),
44 ExternalChunkError::DecodeError(err) => write!(f, "{}", err),
45 }
46 }
47}
48
49pub struct ExternalChunk<T> {
51 reader: Decoder<File>,
52 read_buf: Vec<u8>,
53 item_type: PhantomData<T>,
54}
55
56impl<T> ExternalChunk<T>
57where
58 T: Archive + for<'a> RkyvSerialize<DiskSerializer<'a>>,
59 T::Archived: RkyvDeserialize<T, DiskDeserializer>,
60{
61 pub(crate) fn new(
68 file: File,
69 items: impl IntoIterator<Item = T>,
70 compression: u32,
71 ) -> Result<Self, ExternalChunkError> {
72 let mut builder = ExternalChunkBuilder::new(file, compression)?;
73 for item in items.into_iter() {
74 builder.add(item)?;
75 }
76 builder.finish()
77 }
78
79 pub fn open(file: File) -> Result<Self, ExternalChunkError> {
81 let reader = Decoder::new(file)?;
82 Ok(Self {
83 reader,
84 read_buf: Vec::new(),
85 item_type: PhantomData,
86 })
87 }
88
89 pub fn into_inner(self) -> File {
90 self.reader.finish().0
91 }
92}
93
94impl<T> Iterator for ExternalChunk<T>
95where
96 T: Archive + for<'a> RkyvSerialize<DiskSerializer<'a>>,
97 T::Archived: RkyvDeserialize<T, DiskDeserializer>,
98{
99 type Item = Result<T, ExternalChunkError>;
100
101 fn next(&mut self) -> Option<Self::Item> {
102 match self.reader.read_u64::<byteorder::LittleEndian>() {
103 Err(err) => match err.kind() {
104 std::io::ErrorKind::UnexpectedEof => None,
105 _ => Some(Err(ExternalChunkError::IO(err))),
106 },
107 Ok(length) => {
108 self.read_buf.resize(length as usize, 0);
109 if let Err(err) = self.reader.read_exact(&mut self.read_buf) {
110 return Some(Err(ExternalChunkError::IO(err)));
111 } else {
112 match unsafe {
114 rkyv::from_bytes_unchecked::<T, rkyv::rancor::Error>(&self.read_buf)
115 } {
116 Err(err) => Some(Err(ExternalChunkError::DecodeError(err))),
117 Ok(ser) => Some(Ok(ser)),
118 }
119 }
120 }
121 }
122 }
123}
124
125pub struct ExternalChunkBuilder<T> {
126 writer: Encoder<File>,
127 write_buf: rkyv::util::AlignedVec,
128 item_type: PhantomData<T>,
129}
130
131impl<T> ExternalChunkBuilder<T>
132where
133 T: Archive + for<'a> RkyvSerialize<DiskSerializer<'a>>,
134 T::Archived: RkyvDeserialize<T, DiskDeserializer>,
135{
136 pub fn new(file: File, compression: u32) -> Result<Self, ExternalChunkError> {
137 let writer = EncoderBuilder::new().level(compression).build(file)?;
138 Ok(Self {
139 writer,
140 write_buf: rkyv::util::AlignedVec::new(),
141 item_type: PhantomData,
142 })
143 }
144
145 pub fn add(&mut self, item: T) -> Result<(), ExternalChunkError> {
146 let mut buf = std::mem::take(&mut self.write_buf);
148 buf.clear();
149 buf = rkyv::api::high::to_bytes_in::<_, rkyv::rancor::Error>(&item, buf)
150 .map_err(ExternalChunkError::EncodeError)?;
151 self.writer
152 .write_u64::<byteorder::LittleEndian>(buf.len() as u64)?;
153 self.writer.write_all(&buf)?;
154 self.write_buf = buf;
155 Ok(())
156 }
157
158 pub fn finish(self) -> Result<ExternalChunk<T>, ExternalChunkError> {
159 let mut file = self.writer.finish().0;
160 file.rewind()?;
161 let reader = Decoder::new(file)?;
162
163 Ok(ExternalChunk {
164 reader,
165 read_buf: Vec::new(),
166 item_type: PhantomData,
167 })
168 }
169}
170
171#[cfg(test)]
172mod test {
173 use rstest::*;
174
175 use super::ExternalChunk;
176
177 #[fixture]
178 fn tmp_dir() -> tempfile::TempDir {
179 tempfile::tempdir_in("./").unwrap()
180 }
181
182 #[rstest]
183 fn test_chunk(tmp_dir: tempfile::TempDir) {
184 let saved = Vec::from_iter(0..100);
185
186 let tmp_file = tempfile::tempfile_in(&tmp_dir).unwrap();
187 let chunk: ExternalChunk<i32> = ExternalChunk::new(tmp_file, saved.clone(), 0).unwrap();
188 let restored = chunk.collect::<Result<Vec<_>, _>>().unwrap();
189 assert_eq!(restored, saved);
190
191 let tmp_file = tempfile::tempfile_in(&tmp_dir).unwrap();
192 let chunk: ExternalChunk<i32> = ExternalChunk::new(tmp_file, saved.clone(), 3).unwrap();
193 let restored = chunk.collect::<Result<Vec<_>, _>>().unwrap();
194 assert_eq!(restored, saved);
195 }
196}