parallel_processor/buckets/readers/
generic_binary_reader.rs

1use crate::buckets::bucket_writer::BucketItemSerializer;
2use crate::buckets::readers::BucketReader;
3use crate::buckets::writers::{BucketCheckpoints, BucketHeader};
4use crate::memory_fs::file::reader::{FileRangeReference, FileReader};
5use crate::memory_fs::{MemoryFs, RemoveFileMode};
6use desse::Desse;
7use desse::DesseSized;
8use replace_with::replace_with_or_abort;
9use serde::de::DeserializeOwned;
10use std::io::{Read, Seek, SeekFrom};
11use std::path::{Path, PathBuf};
12use std::sync::atomic::AtomicU64;
13use std::sync::atomic::Ordering;
14
15use super::async_binary_reader::AllowedCheckpointStrategy;
16
17pub trait ChunkDecoder {
18    const MAGIC_HEADER: &'static [u8; 16];
19    type ReaderType: Read;
20    fn decode_stream(reader: FileReader, size: u64) -> Self::ReaderType;
21    fn dispose_stream(stream: Self::ReaderType) -> FileReader;
22}
23
24pub struct GenericChunkedBinaryReader<D: ChunkDecoder> {
25    remove_file: RemoveFileMode,
26    sequential_reader: SequentialReader<D>,
27    parallel_reader: FileReader,
28    parallel_index: AtomicU64,
29    file_path: PathBuf,
30    format_data_info: Vec<u8>,
31}
32
33pub enum ChunkReader<T, R> {
34    Reader(R, Option<T>),
35    Passtrough {
36        file_range: FileRangeReference,
37        data: Option<T>,
38    },
39}
40
41pub enum DecodeItemsStatus<T> {
42    Decompressed,
43    Passtrough {
44        file_range: FileRangeReference,
45        data: Option<T>,
46    },
47}
48
49unsafe impl<D: ChunkDecoder> Sync for GenericChunkedBinaryReader<D> {}
50
51struct SequentialReader<D: ChunkDecoder> {
52    reader: D::ReaderType,
53    index: BucketCheckpoints,
54    last_byte_position: u64,
55    index_position: u64,
56}
57
58impl<D: ChunkDecoder> SequentialReader<D> {
59    fn get_chunk_size(
60        checkpoints: &BucketCheckpoints,
61        last_byte_position: u64,
62        index: usize,
63    ) -> u64 {
64        if checkpoints.index.len() > (index + 1) as usize {
65            checkpoints.index[(index + 1) as usize].offset
66                - checkpoints.index[index as usize].offset
67        } else {
68            last_byte_position - checkpoints.index[index as usize].offset
69        }
70    }
71}
72
73impl<D: ChunkDecoder> Read for SequentialReader<D> {
74    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
75        loop {
76            match self.reader.read(buf) {
77                Ok(read) => {
78                    if read != 0 {
79                        return Ok(read);
80                    }
81                }
82                Err(err) => {
83                    return Err(err);
84                }
85            }
86            self.index_position += 1;
87
88            if self.index_position >= self.index.index.len() as u64 {
89                return Ok(0);
90            }
91
92            replace_with_or_abort(&mut self.reader, |reader| {
93                let mut file = D::dispose_stream(reader);
94                // This assert sometimes fails as the lz4 library can buffer more data that is then discarded on drop
95                // assert_eq!(
96                //     file.stream_position().unwrap(),
97                //     self.index.index[self.index_position as usize]
98                // );
99                file.seek(SeekFrom::Start(
100                    self.index.index[self.index_position as usize].offset,
101                ))
102                .unwrap();
103                let size = SequentialReader::<D>::get_chunk_size(
104                    &self.index,
105                    self.last_byte_position,
106                    self.index_position as usize,
107                );
108                D::decode_stream(file, size)
109            });
110        }
111    }
112}
113
114impl<D: ChunkDecoder> GenericChunkedBinaryReader<D> {
115    pub fn new(
116        name: impl AsRef<Path>,
117        remove_file: RemoveFileMode,
118        prefetch_amount: Option<usize>,
119    ) -> Self {
120        let mut file = FileReader::open(&name, prefetch_amount)
121            .unwrap_or_else(|| panic!("Cannot open file {}", name.as_ref().display()));
122
123        let mut header_buffer = [0; BucketHeader::SIZE];
124        file.read_exact(&mut header_buffer)
125            .unwrap_or_else(|_| panic!("File {} is corrupted", name.as_ref().display()));
126
127        let header: BucketHeader = BucketHeader::deserialize_from(&header_buffer);
128        assert_eq!(&header.magic, D::MAGIC_HEADER);
129
130        file.seek(SeekFrom::Start(header.index_offset)).unwrap();
131        let index: BucketCheckpoints = bincode::deserialize_from(&mut file).unwrap();
132
133        // crate::log_info!(
134        //     "Index: {} for {}",
135        //     index.index.len(),
136        //     name.as_ref().display()
137        // );
138
139        file.seek(SeekFrom::Start(index.index[0].offset)).unwrap();
140
141        let size = SequentialReader::<D>::get_chunk_size(&index, header.index_offset, 0);
142
143        Self {
144            sequential_reader: SequentialReader {
145                reader: D::decode_stream(file, size),
146                index,
147                last_byte_position: header.index_offset,
148                index_position: 0,
149            },
150            parallel_reader: FileReader::open(&name, prefetch_amount).unwrap(),
151            parallel_index: AtomicU64::new(0),
152            remove_file,
153            file_path: name.as_ref().to_path_buf(),
154            format_data_info: header.data_format_info.to_vec(),
155        }
156    }
157
158    pub fn get_data_format_info<T: DeserializeOwned>(&self) -> T {
159        bincode::deserialize(&self.format_data_info).unwrap()
160    }
161
162    pub fn get_length(&self) -> usize {
163        self.parallel_reader.total_file_size()
164    }
165
166    pub fn get_chunks_count(&self) -> usize {
167        self.sequential_reader.index.index.len()
168    }
169
170    pub fn is_finished(&self) -> bool {
171        self.parallel_index.load(Ordering::Relaxed) as usize
172            >= self.sequential_reader.index.index.len()
173    }
174
175    pub fn get_single_stream<'a>(&'a mut self) -> impl Read + 'a {
176        &mut self.sequential_reader
177    }
178
179    pub fn get_read_parallel_stream_with_chunk_type<T: DeserializeOwned>(
180        &self,
181        allowed_strategy: AllowedCheckpointStrategy<[u8]>,
182    ) -> Option<ChunkReader<T, D::ReaderType>> {
183        match self.get_read_parallel_stream(allowed_strategy) {
184            None => None,
185            Some(ChunkReader::Reader(stream, data)) => Some(ChunkReader::Reader(
186                stream,
187                data.map(|data| bincode::deserialize(&data).unwrap()),
188            )),
189            Some(ChunkReader::Passtrough { file_range, data }) => Some(ChunkReader::Passtrough {
190                file_range,
191                data: data.map(|data| bincode::deserialize(&data).unwrap()),
192            }),
193        }
194    }
195
196    pub fn get_read_parallel_stream(
197        &self,
198        allowed_strategy: AllowedCheckpointStrategy<[u8]>,
199    ) -> Option<ChunkReader<Vec<u8>, D::ReaderType>> {
200        let index = self.parallel_index.fetch_add(1, Ordering::Relaxed) as usize;
201
202        if index >= self.sequential_reader.index.index.len() {
203            return None;
204        }
205
206        let addr_start = self.sequential_reader.index.index[index].offset as usize;
207        let checkpoint_data = self.sequential_reader.index.index[index].data.clone();
208
209        let mut reader = self.parallel_reader.clone();
210        reader.seek(SeekFrom::Start(addr_start as u64)).unwrap();
211
212        let size = SequentialReader::<D>::get_chunk_size(
213            &self.sequential_reader.index,
214            self.sequential_reader.last_byte_position,
215            index,
216        );
217
218        match allowed_strategy {
219            AllowedCheckpointStrategy::DecompressOnly => Some(ChunkReader::Reader(
220                D::decode_stream(reader, size),
221                checkpoint_data,
222            )),
223            AllowedCheckpointStrategy::AllowPasstrough(checker) => {
224                if checker(checkpoint_data.as_deref()) {
225                    let file_range = (addr_start as u64)..(addr_start as u64 + size);
226                    Some(ChunkReader::Passtrough {
227                        file_range: reader.get_range_reference(file_range),
228                        data: checkpoint_data,
229                    })
230                } else {
231                    Some(ChunkReader::Reader(
232                        D::decode_stream(reader, size),
233                        checkpoint_data,
234                    ))
235                }
236            }
237        }
238    }
239
240    pub fn decode_bucket_items_parallel<
241        S: BucketItemSerializer,
242        F: for<'a> FnMut(S::ReadType<'a>, &mut S::ExtraDataBuffer, Option<&S::CheckpointData>),
243    >(
244        &self,
245        mut buffer: S::ReadBuffer,
246        mut extra_buffer: S::ExtraDataBuffer,
247        allowed_strategy: AllowedCheckpointStrategy<[u8]>,
248        mut func: F,
249        deserializer_init_data: S::InitData,
250    ) -> Option<DecodeItemsStatus<S::CheckpointData>> {
251        let stream = match self
252            .get_read_parallel_stream_with_chunk_type::<S::CheckpointData>(allowed_strategy)
253        {
254            None => return None,
255            Some(stream) => stream,
256        };
257
258        let mut deserializer = S::new(deserializer_init_data);
259
260        match stream {
261            ChunkReader::Reader(mut stream, data) => {
262                while let Some(el) =
263                    deserializer.read_from(&mut stream, &mut buffer, &mut extra_buffer)
264                {
265                    func(el, &mut extra_buffer, data.as_ref());
266                }
267                Some(DecodeItemsStatus::Decompressed)
268            }
269            ChunkReader::Passtrough { file_range, data } => {
270                Some(DecodeItemsStatus::Passtrough { file_range, data })
271            }
272        }
273    }
274}
275
276impl<D: ChunkDecoder> BucketReader for GenericChunkedBinaryReader<D> {
277    fn decode_all_bucket_items<
278        S: BucketItemSerializer,
279        F: for<'a> FnMut(S::ReadType<'a>, &mut S::ExtraDataBuffer),
280    >(
281        mut self,
282        mut buffer: S::ReadBuffer,
283        extra_buffer: &mut S::ExtraDataBuffer,
284        mut func: F,
285        deserializer_init_data: S::InitData,
286    ) {
287        let mut stream = self.get_single_stream();
288
289        let mut deserializer = S::new(deserializer_init_data);
290        while let Some(el) = deserializer.read_from(&mut stream, &mut buffer, extra_buffer) {
291            func(el, extra_buffer);
292        }
293    }
294
295    fn get_name(&self) -> PathBuf {
296        self.file_path.clone()
297    }
298}
299
300impl<D: ChunkDecoder> Drop for GenericChunkedBinaryReader<D> {
301    fn drop(&mut self) {
302        MemoryFs::remove_file(&self.file_path, self.remove_file).unwrap();
303    }
304}