1use std::error::Error;
4use std::fmt::{self, Display};
5use std::fs;
6use std::io;
7use std::io::prelude::*;
8use std::marker::PhantomData;
9
10use tempfile;
11
12#[derive(Debug)]
14pub enum ExternalChunkError<S: Error> {
15 IO(io::Error),
17 SerializationError(S),
19}
20
21impl<S: Error> Error for ExternalChunkError<S> {}
22
23impl<S: Error> Display for ExternalChunkError<S> {
24 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
25 match self {
26 ExternalChunkError::IO(err) => write!(f, "{}", err),
27 ExternalChunkError::SerializationError(err) => write!(f, "{}", err),
28 }
29 }
30}
31
32impl<S: Error> From<io::Error> for ExternalChunkError<S> {
33 fn from(err: io::Error) -> Self {
34 ExternalChunkError::IO(err)
35 }
36}
37
38pub trait ExternalChunk<T>: Sized + Iterator<Item = Result<T, Self::DeserializationError>> {
40 type SerializationError: Error;
42 type DeserializationError: Error;
44
45 fn build(
52 dir: &tempfile::TempDir,
53 items: impl IntoIterator<Item = T>,
54 buf_size: Option<usize>,
55 ) -> Result<Self, ExternalChunkError<Self::SerializationError>> {
56 let tmp_file = tempfile::tempfile_in(dir)?;
57
58 let mut chunk_writer = match buf_size {
59 Some(buf_size) => io::BufWriter::with_capacity(buf_size, tmp_file.try_clone()?),
60 None => io::BufWriter::new(tmp_file.try_clone()?),
61 };
62
63 Self::dump(&mut chunk_writer, items).map_err(ExternalChunkError::SerializationError)?;
64
65 chunk_writer.flush()?;
66
67 let mut chunk_reader = match buf_size {
68 Some(buf_size) => io::BufReader::with_capacity(buf_size, tmp_file.try_clone()?),
69 None => io::BufReader::new(tmp_file.try_clone()?),
70 };
71
72 chunk_reader.rewind()?;
73 let file_len = tmp_file.metadata()?.len();
74
75 return Ok(Self::new(chunk_reader.take(file_len)));
76 }
77
78 fn new(reader: io::Take<io::BufReader<fs::File>>) -> Self;
83
84 fn dump(
90 chunk_writer: &mut io::BufWriter<fs::File>,
91 items: impl IntoIterator<Item = T>,
92 ) -> Result<(), Self::SerializationError>;
93}
94
95pub struct RmpExternalChunk<T> {
109 reader: io::Take<io::BufReader<fs::File>>,
110
111 item_type: PhantomData<T>,
112}
113
114impl<T> ExternalChunk<T> for RmpExternalChunk<T>
115where
116 T: serde::ser::Serialize + serde::de::DeserializeOwned,
117{
118 type SerializationError = rmp_serde::encode::Error;
119 type DeserializationError = rmp_serde::decode::Error;
120
121 fn new(reader: io::Take<io::BufReader<fs::File>>) -> Self {
122 RmpExternalChunk {
123 reader,
124 item_type: PhantomData,
125 }
126 }
127
128 fn dump(
129 mut chunk_writer: &mut io::BufWriter<fs::File>,
130 items: impl IntoIterator<Item = T>,
131 ) -> Result<(), Self::SerializationError> {
132 for item in items.into_iter() {
133 rmp_serde::encode::write(&mut chunk_writer, &item)?;
134 }
135
136 return Ok(());
137 }
138}
139
140impl<T> Iterator for RmpExternalChunk<T>
141where
142 T: serde::ser::Serialize + serde::de::DeserializeOwned,
143{
144 type Item = Result<T, <Self as ExternalChunk<T>>::DeserializationError>;
145
146 fn next(&mut self) -> Option<Self::Item> {
147 if self.reader.limit() == 0 {
148 None
149 } else {
150 match rmp_serde::decode::from_read(&mut self.reader) {
151 Ok(result) => Some(Ok(result)),
152 Err(err) => Some(Err(err)),
153 }
154 }
155 }
156}
157
158#[cfg(test)]
159mod test {
160 use rstest::*;
161
162 use super::{ExternalChunk, RmpExternalChunk};
163
164 #[fixture]
165 fn tmp_dir() -> tempfile::TempDir {
166 tempfile::tempdir_in("./").unwrap()
167 }
168
169 #[rstest]
170 fn test_rmp_chunk(tmp_dir: tempfile::TempDir) {
171 let saved = Vec::from_iter(0..100);
172
173 let chunk: RmpExternalChunk<i32> = ExternalChunk::build(&tmp_dir, saved.clone(), None).unwrap();
174
175 let restored: Result<Vec<i32>, _> = chunk.collect();
176 let restored = restored.unwrap();
177
178 assert_eq!(restored, saved);
179 }
180}