parallel_processor/buckets/readers/
generic_binary_reader.rs1use crate::buckets::bucket_writer::BucketItemSerializer;
2use crate::buckets::readers::BucketReader;
3use crate::buckets::writers::{BucketCheckpoints, BucketHeader};
4use crate::memory_fs::file::reader::{FileRangeReference, FileReader};
5use crate::memory_fs::{MemoryFs, RemoveFileMode};
6use desse::Desse;
7use desse::DesseSized;
8use replace_with::replace_with_or_abort;
9use serde::de::DeserializeOwned;
10use std::io::{Read, Seek, SeekFrom};
11use std::path::{Path, PathBuf};
12use std::sync::atomic::AtomicU64;
13use std::sync::atomic::Ordering;
14
15use super::async_binary_reader::AllowedCheckpointStrategy;
16
17pub trait ChunkDecoder {
18 const MAGIC_HEADER: &'static [u8; 16];
19 type ReaderType: Read;
20 fn decode_stream(reader: FileReader, size: u64) -> Self::ReaderType;
21 fn dispose_stream(stream: Self::ReaderType) -> FileReader;
22}
23
24pub struct GenericChunkedBinaryReader<D: ChunkDecoder> {
25 remove_file: RemoveFileMode,
26 sequential_reader: SequentialReader<D>,
27 parallel_reader: FileReader,
28 parallel_index: AtomicU64,
29 file_path: PathBuf,
30 format_data_info: Vec<u8>,
31}
32
33pub enum ChunkReader<T, R> {
34 Reader(R, Option<T>),
35 Passtrough {
36 file_range: FileRangeReference,
37 data: Option<T>,
38 },
39}
40
41pub enum DecodeItemsStatus<T> {
42 Decompressed,
43 Passtrough {
44 file_range: FileRangeReference,
45 data: Option<T>,
46 },
47}
48
49unsafe impl<D: ChunkDecoder> Sync for GenericChunkedBinaryReader<D> {}
50
51struct SequentialReader<D: ChunkDecoder> {
52 reader: D::ReaderType,
53 index: BucketCheckpoints,
54 last_byte_position: u64,
55 index_position: u64,
56}
57
58impl<D: ChunkDecoder> SequentialReader<D> {
59 fn get_chunk_size(
60 checkpoints: &BucketCheckpoints,
61 last_byte_position: u64,
62 index: usize,
63 ) -> u64 {
64 if checkpoints.index.len() > (index + 1) as usize {
65 checkpoints.index[(index + 1) as usize].offset
66 - checkpoints.index[index as usize].offset
67 } else {
68 last_byte_position - checkpoints.index[index as usize].offset
69 }
70 }
71}
72
73impl<D: ChunkDecoder> Read for SequentialReader<D> {
74 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
75 loop {
76 match self.reader.read(buf) {
77 Ok(read) => {
78 if read != 0 {
79 return Ok(read);
80 }
81 }
82 Err(err) => {
83 return Err(err);
84 }
85 }
86 self.index_position += 1;
87
88 if self.index_position >= self.index.index.len() as u64 {
89 return Ok(0);
90 }
91
92 replace_with_or_abort(&mut self.reader, |reader| {
93 let mut file = D::dispose_stream(reader);
94 file.seek(SeekFrom::Start(
100 self.index.index[self.index_position as usize].offset,
101 ))
102 .unwrap();
103 let size = SequentialReader::<D>::get_chunk_size(
104 &self.index,
105 self.last_byte_position,
106 self.index_position as usize,
107 );
108 D::decode_stream(file, size)
109 });
110 }
111 }
112}
113
114impl<D: ChunkDecoder> GenericChunkedBinaryReader<D> {
115 pub fn new(
116 name: impl AsRef<Path>,
117 remove_file: RemoveFileMode,
118 prefetch_amount: Option<usize>,
119 ) -> Self {
120 let mut file = FileReader::open(&name, prefetch_amount)
121 .unwrap_or_else(|| panic!("Cannot open file {}", name.as_ref().display()));
122
123 let mut header_buffer = [0; BucketHeader::SIZE];
124 file.read_exact(&mut header_buffer)
125 .unwrap_or_else(|_| panic!("File {} is corrupted", name.as_ref().display()));
126
127 let header: BucketHeader = BucketHeader::deserialize_from(&header_buffer);
128 assert_eq!(&header.magic, D::MAGIC_HEADER);
129
130 file.seek(SeekFrom::Start(header.index_offset)).unwrap();
131 let index: BucketCheckpoints = bincode::deserialize_from(&mut file).unwrap();
132
133 file.seek(SeekFrom::Start(index.index[0].offset)).unwrap();
140
141 let size = SequentialReader::<D>::get_chunk_size(&index, header.index_offset, 0);
142
143 Self {
144 sequential_reader: SequentialReader {
145 reader: D::decode_stream(file, size),
146 index,
147 last_byte_position: header.index_offset,
148 index_position: 0,
149 },
150 parallel_reader: FileReader::open(&name, prefetch_amount).unwrap(),
151 parallel_index: AtomicU64::new(0),
152 remove_file,
153 file_path: name.as_ref().to_path_buf(),
154 format_data_info: header.data_format_info.to_vec(),
155 }
156 }
157
158 pub fn get_data_format_info<T: DeserializeOwned>(&self) -> T {
159 bincode::deserialize(&self.format_data_info).unwrap()
160 }
161
162 pub fn get_length(&self) -> usize {
163 self.parallel_reader.total_file_size()
164 }
165
166 pub fn get_chunks_count(&self) -> usize {
167 self.sequential_reader.index.index.len()
168 }
169
170 pub fn is_finished(&self) -> bool {
171 self.parallel_index.load(Ordering::Relaxed) as usize
172 >= self.sequential_reader.index.index.len()
173 }
174
175 pub fn get_single_stream<'a>(&'a mut self) -> impl Read + 'a {
176 &mut self.sequential_reader
177 }
178
179 pub fn get_read_parallel_stream_with_chunk_type<T: DeserializeOwned>(
180 &self,
181 allowed_strategy: AllowedCheckpointStrategy<[u8]>,
182 ) -> Option<ChunkReader<T, D::ReaderType>> {
183 match self.get_read_parallel_stream(allowed_strategy) {
184 None => None,
185 Some(ChunkReader::Reader(stream, data)) => Some(ChunkReader::Reader(
186 stream,
187 data.map(|data| bincode::deserialize(&data).unwrap()),
188 )),
189 Some(ChunkReader::Passtrough { file_range, data }) => Some(ChunkReader::Passtrough {
190 file_range,
191 data: data.map(|data| bincode::deserialize(&data).unwrap()),
192 }),
193 }
194 }
195
196 pub fn get_read_parallel_stream(
197 &self,
198 allowed_strategy: AllowedCheckpointStrategy<[u8]>,
199 ) -> Option<ChunkReader<Vec<u8>, D::ReaderType>> {
200 let index = self.parallel_index.fetch_add(1, Ordering::Relaxed) as usize;
201
202 if index >= self.sequential_reader.index.index.len() {
203 return None;
204 }
205
206 let addr_start = self.sequential_reader.index.index[index].offset as usize;
207 let checkpoint_data = self.sequential_reader.index.index[index].data.clone();
208
209 let mut reader = self.parallel_reader.clone();
210 reader.seek(SeekFrom::Start(addr_start as u64)).unwrap();
211
212 let size = SequentialReader::<D>::get_chunk_size(
213 &self.sequential_reader.index,
214 self.sequential_reader.last_byte_position,
215 index,
216 );
217
218 match allowed_strategy {
219 AllowedCheckpointStrategy::DecompressOnly => Some(ChunkReader::Reader(
220 D::decode_stream(reader, size),
221 checkpoint_data,
222 )),
223 AllowedCheckpointStrategy::AllowPasstrough(checker) => {
224 if checker(checkpoint_data.as_deref()) {
225 let file_range = (addr_start as u64)..(addr_start as u64 + size);
226 Some(ChunkReader::Passtrough {
227 file_range: reader.get_range_reference(file_range),
228 data: checkpoint_data,
229 })
230 } else {
231 Some(ChunkReader::Reader(
232 D::decode_stream(reader, size),
233 checkpoint_data,
234 ))
235 }
236 }
237 }
238 }
239
240 pub fn decode_bucket_items_parallel<
241 S: BucketItemSerializer,
242 F: for<'a> FnMut(S::ReadType<'a>, &mut S::ExtraDataBuffer, Option<&S::CheckpointData>),
243 >(
244 &self,
245 mut buffer: S::ReadBuffer,
246 mut extra_buffer: S::ExtraDataBuffer,
247 allowed_strategy: AllowedCheckpointStrategy<[u8]>,
248 mut func: F,
249 deserializer_init_data: S::InitData,
250 ) -> Option<DecodeItemsStatus<S::CheckpointData>> {
251 let stream = match self
252 .get_read_parallel_stream_with_chunk_type::<S::CheckpointData>(allowed_strategy)
253 {
254 None => return None,
255 Some(stream) => stream,
256 };
257
258 let mut deserializer = S::new(deserializer_init_data);
259
260 match stream {
261 ChunkReader::Reader(mut stream, data) => {
262 while let Some(el) =
263 deserializer.read_from(&mut stream, &mut buffer, &mut extra_buffer)
264 {
265 func(el, &mut extra_buffer, data.as_ref());
266 }
267 Some(DecodeItemsStatus::Decompressed)
268 }
269 ChunkReader::Passtrough { file_range, data } => {
270 Some(DecodeItemsStatus::Passtrough { file_range, data })
271 }
272 }
273 }
274}
275
276impl<D: ChunkDecoder> BucketReader for GenericChunkedBinaryReader<D> {
277 fn decode_all_bucket_items<
278 S: BucketItemSerializer,
279 F: for<'a> FnMut(S::ReadType<'a>, &mut S::ExtraDataBuffer),
280 >(
281 mut self,
282 mut buffer: S::ReadBuffer,
283 extra_buffer: &mut S::ExtraDataBuffer,
284 mut func: F,
285 deserializer_init_data: S::InitData,
286 ) {
287 let mut stream = self.get_single_stream();
288
289 let mut deserializer = S::new(deserializer_init_data);
290 while let Some(el) = deserializer.read_from(&mut stream, &mut buffer, extra_buffer) {
291 func(el, extra_buffer);
292 }
293 }
294
295 fn get_name(&self) -> PathBuf {
296 self.file_path.clone()
297 }
298}
299
300impl<D: ChunkDecoder> Drop for GenericChunkedBinaryReader<D> {
301 fn drop(&mut self) {
302 MemoryFs::remove_file(&self.file_path, self.remove_file).unwrap();
303 }
304}