parallel_processor/buckets/readers/
binary_reader.rs

1use crate::buckets::readers::compressed_decoder::CompressedStreamDecoder;
2use crate::buckets::readers::lock_free_decoder::LockFreeStreamDecoder;
3use crate::buckets::writers::{BucketCheckpoints, BucketHeader};
4use crate::memory_fs::file::reader::{FileRangeReference, FileReader};
5use crate::memory_fs::{MemoryFs, RemoveFileMode};
6use crate::DEFAULT_BINCODE_CONFIG;
7use bincode::Decode;
8use desse::Desse;
9use desse::DesseSized;
10use parking_lot::Mutex;
11use std::collections::VecDeque;
12use std::fmt::Debug;
13use std::io::{Read, Seek, SeekFrom};
14use std::path::{Path, PathBuf};
15use std::sync::Arc;
16
17pub trait ChunkDecoder {
18    const MAGIC_HEADER: &'static [u8; 16];
19    type ReaderType: Read;
20    fn decode_stream(reader: FileReader, size: u64) -> Self::ReaderType;
21    fn dispose_stream(stream: Self::ReaderType) -> FileReader;
22}
23
24pub(crate) struct RemoveFileGuard {
25    path: PathBuf,
26    remove_mode: RemoveFileMode,
27}
28
29impl Drop for RemoveFileGuard {
30    fn drop(&mut self) {
31        MemoryFs::remove_file(&self.path, self.remove_mode).unwrap();
32    }
33}
34
35#[derive(Clone)]
36pub struct BinaryReaderChunk {
37    pub(crate) reader: FileReader,
38    pub(crate) length: u64,
39    pub(crate) remove_guard: Arc<RemoveFileGuard>,
40    pub(crate) extra_data: Option<Vec<u8>>,
41    pub(crate) decoder_type: DecoderType,
42}
43
44impl Debug for BinaryReaderChunk {
45    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46        f.debug_struct("BinaryReaderChunk")
47            .field("length", &self.length)
48            .field("extra_data", &self.extra_data)
49            .field("decoder_type", &self.decoder_type)
50            .field("file_name", &self.reader.get_file_path().display())
51            .finish()
52    }
53}
54
55impl BinaryReaderChunk {
56    pub fn get_extra_data<E: Decode<()>>(&self) -> Option<E> {
57        self.extra_data.as_ref().map(|data| {
58            bincode::decode_from_slice(&data, DEFAULT_BINCODE_CONFIG)
59                .unwrap()
60                .0
61        })
62    }
63
64    pub fn get_decoder_type(&self) -> DecoderType {
65        self.decoder_type
66    }
67
68    pub fn get_unique_file_id(&self) -> usize {
69        self.reader.get_unique_file_id()
70    }
71
72    pub fn get_length(&self) -> u64 {
73        self.length
74    }
75}
76
77#[derive(Clone, Copy, PartialEq, Eq, Debug)]
78pub enum DecoderType {
79    LockFree,
80    Compressed,
81}
82
83pub struct ChunkedBinaryReaderIndex {
84    file_path: PathBuf,
85    chunks: Vec<BinaryReaderChunk>,
86    format_data_info: Vec<u8>,
87    file_size: u64,
88}
89
90pub enum DecodeItemsStatus<T> {
91    Decompressed,
92    Passtrough {
93        file_range: FileRangeReference,
94        data: Option<T>,
95    },
96}
97
98impl ChunkedBinaryReaderIndex {
99    fn get_chunk_size(
100        checkpoints: &BucketCheckpoints,
101        last_byte_position: u64,
102        index: usize,
103    ) -> u64 {
104        if checkpoints.index.len() > (index + 1) as usize {
105            checkpoints.index[(index + 1) as usize].offset
106                - checkpoints.index[index as usize].offset
107        } else {
108            last_byte_position - checkpoints.index[index as usize].offset
109        }
110    }
111
112    pub fn get_file_size(&self) -> u64 {
113        self.file_size
114    }
115
116    pub fn from_file(
117        name: impl AsRef<Path>,
118        remove_file: RemoveFileMode,
119        prefetch_amount: Option<usize>,
120    ) -> Self {
121        let mut file = FileReader::open(&name, prefetch_amount)
122            .unwrap_or_else(|| panic!("Cannot open file {}", name.as_ref().display()));
123
124        let mut header_buffer = [0; BucketHeader::SIZE];
125        file.read_exact(&mut header_buffer)
126            .unwrap_or_else(|_| panic!("File {} is corrupted", name.as_ref().display()));
127
128        let header: BucketHeader = BucketHeader::deserialize_from(&header_buffer);
129
130        file.seek(SeekFrom::Start(header.index_offset)).unwrap();
131        let index: BucketCheckpoints =
132            bincode::decode_from_std_read(&mut file, DEFAULT_BINCODE_CONFIG).unwrap();
133
134        let remove_guard = Arc::new(RemoveFileGuard {
135            path: name.as_ref().to_path_buf(),
136            remove_mode: remove_file,
137        });
138
139        let mut chunks = Vec::with_capacity(index.index.len());
140
141        let decoder_type = match &header.magic {
142            LockFreeStreamDecoder::MAGIC_HEADER => DecoderType::LockFree,
143            CompressedStreamDecoder::MAGIC_HEADER => DecoderType::Compressed,
144            _ => panic!(
145                "Invalid decode bucket header magic: {:?}. This is a bug",
146                header.magic
147            ),
148        };
149
150        if index.index.len() > 0 {
151            file.seek(SeekFrom::Start(index.index[0].offset)).unwrap();
152
153            for (chunk_idx, chunk) in index.index.iter().enumerate() {
154                let length = Self::get_chunk_size(&index, header.index_offset, chunk_idx);
155
156                chunks.push(BinaryReaderChunk {
157                    reader: file.clone(),
158                    length,
159                    remove_guard: remove_guard.clone(),
160                    extra_data: chunk.data.clone(),
161                    decoder_type,
162                });
163
164                file.seek(SeekFrom::Current(length as i64)).unwrap();
165            }
166        }
167
168        Self {
169            file_path: name.as_ref().to_path_buf(),
170            chunks,
171            format_data_info: header.data_format_info.to_vec(),
172            file_size: MemoryFs::get_file_size(name).unwrap() as u64,
173        }
174    }
175
176    pub fn get_data_format_info<T: Decode<()>>(&self) -> T {
177        bincode::decode_from_slice(&self.format_data_info, DEFAULT_BINCODE_CONFIG)
178            .unwrap()
179            .0
180    }
181
182    pub fn get_path(&self) -> &Path {
183        &self.file_path
184    }
185
186    pub fn into_chunks(self) -> Vec<BinaryReaderChunk> {
187        self.chunks
188    }
189
190    pub fn into_parallel_chunks(self) -> Mutex<VecDeque<BinaryReaderChunk>> {
191        Mutex::new(self.chunks.into())
192    }
193}
194
195pub struct BinaryChunkReader<D: ChunkDecoder> {
196    reader: D::ReaderType,
197    _remove_guard: Arc<RemoveFileGuard>,
198}
199
200impl<D: ChunkDecoder> BinaryChunkReader<D> {
201    pub fn new(chunk: BinaryReaderChunk) -> Self {
202        Self {
203            reader: D::decode_stream(chunk.reader, chunk.length),
204            _remove_guard: chunk.remove_guard,
205        }
206    }
207}
208
209impl<D: ChunkDecoder> Read for BinaryChunkReader<D> {
210    #[inline(always)]
211    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
212        self.reader.read(buf)
213    }
214}