parallel_processor/buckets/readers/
binary_reader.rs1use crate::buckets::readers::compressed_decoder::CompressedStreamDecoder;
2use crate::buckets::readers::lock_free_decoder::LockFreeStreamDecoder;
3use crate::buckets::writers::{BucketCheckpoints, BucketHeader};
4use crate::memory_fs::file::reader::{FileRangeReference, FileReader};
5use crate::memory_fs::{MemoryFs, RemoveFileMode};
6use crate::DEFAULT_BINCODE_CONFIG;
7use bincode::Decode;
8use desse::Desse;
9use desse::DesseSized;
10use parking_lot::Mutex;
11use std::collections::VecDeque;
12use std::fmt::Debug;
13use std::io::{Read, Seek, SeekFrom};
14use std::path::{Path, PathBuf};
15use std::sync::Arc;
16
17pub trait ChunkDecoder {
18 const MAGIC_HEADER: &'static [u8; 16];
19 type ReaderType: Read;
20 fn decode_stream(reader: FileReader, size: u64) -> Self::ReaderType;
21 fn dispose_stream(stream: Self::ReaderType) -> FileReader;
22}
23
24pub(crate) struct RemoveFileGuard {
25 path: PathBuf,
26 remove_mode: RemoveFileMode,
27}
28
29impl Drop for RemoveFileGuard {
30 fn drop(&mut self) {
31 MemoryFs::remove_file(&self.path, self.remove_mode).unwrap();
32 }
33}
34
35#[derive(Clone)]
36pub struct BinaryReaderChunk {
37 pub(crate) reader: FileReader,
38 pub(crate) length: u64,
39 pub(crate) remove_guard: Arc<RemoveFileGuard>,
40 pub(crate) extra_data: Option<Vec<u8>>,
41 pub(crate) decoder_type: DecoderType,
42}
43
44impl Debug for BinaryReaderChunk {
45 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46 f.debug_struct("BinaryReaderChunk")
47 .field("length", &self.length)
48 .field("extra_data", &self.extra_data)
49 .field("decoder_type", &self.decoder_type)
50 .field("file_name", &self.reader.get_file_path().display())
51 .finish()
52 }
53}
54
55impl BinaryReaderChunk {
56 pub fn get_extra_data<E: Decode<()>>(&self) -> Option<E> {
57 self.extra_data.as_ref().map(|data| {
58 bincode::decode_from_slice(&data, DEFAULT_BINCODE_CONFIG)
59 .unwrap()
60 .0
61 })
62 }
63
64 pub fn get_decoder_type(&self) -> DecoderType {
65 self.decoder_type
66 }
67
68 pub fn get_unique_file_id(&self) -> usize {
69 self.reader.get_unique_file_id()
70 }
71
72 pub fn get_length(&self) -> u64 {
73 self.length
74 }
75}
76
77#[derive(Clone, Copy, PartialEq, Eq, Debug)]
78pub enum DecoderType {
79 LockFree,
80 Compressed,
81}
82
83pub struct ChunkedBinaryReaderIndex {
84 file_path: PathBuf,
85 chunks: Vec<BinaryReaderChunk>,
86 format_data_info: Vec<u8>,
87 file_size: u64,
88}
89
90pub enum DecodeItemsStatus<T> {
91 Decompressed,
92 Passtrough {
93 file_range: FileRangeReference,
94 data: Option<T>,
95 },
96}
97
98impl ChunkedBinaryReaderIndex {
99 fn get_chunk_size(
100 checkpoints: &BucketCheckpoints,
101 last_byte_position: u64,
102 index: usize,
103 ) -> u64 {
104 if checkpoints.index.len() > (index + 1) as usize {
105 checkpoints.index[(index + 1) as usize].offset
106 - checkpoints.index[index as usize].offset
107 } else {
108 last_byte_position - checkpoints.index[index as usize].offset
109 }
110 }
111
112 pub fn get_file_size(&self) -> u64 {
113 self.file_size
114 }
115
116 pub fn from_file(
117 name: impl AsRef<Path>,
118 remove_file: RemoveFileMode,
119 prefetch_amount: Option<usize>,
120 ) -> Self {
121 let mut file = FileReader::open(&name, prefetch_amount)
122 .unwrap_or_else(|| panic!("Cannot open file {}", name.as_ref().display()));
123
124 let mut header_buffer = [0; BucketHeader::SIZE];
125 file.read_exact(&mut header_buffer)
126 .unwrap_or_else(|_| panic!("File {} is corrupted", name.as_ref().display()));
127
128 let header: BucketHeader = BucketHeader::deserialize_from(&header_buffer);
129
130 file.seek(SeekFrom::Start(header.index_offset)).unwrap();
131 let index: BucketCheckpoints =
132 bincode::decode_from_std_read(&mut file, DEFAULT_BINCODE_CONFIG).unwrap();
133
134 let remove_guard = Arc::new(RemoveFileGuard {
135 path: name.as_ref().to_path_buf(),
136 remove_mode: remove_file,
137 });
138
139 let mut chunks = Vec::with_capacity(index.index.len());
140
141 let decoder_type = match &header.magic {
142 LockFreeStreamDecoder::MAGIC_HEADER => DecoderType::LockFree,
143 CompressedStreamDecoder::MAGIC_HEADER => DecoderType::Compressed,
144 _ => panic!(
145 "Invalid decode bucket header magic: {:?}. This is a bug",
146 header.magic
147 ),
148 };
149
150 if index.index.len() > 0 {
151 file.seek(SeekFrom::Start(index.index[0].offset)).unwrap();
152
153 for (chunk_idx, chunk) in index.index.iter().enumerate() {
154 let length = Self::get_chunk_size(&index, header.index_offset, chunk_idx);
155
156 chunks.push(BinaryReaderChunk {
157 reader: file.clone(),
158 length,
159 remove_guard: remove_guard.clone(),
160 extra_data: chunk.data.clone(),
161 decoder_type,
162 });
163
164 file.seek(SeekFrom::Current(length as i64)).unwrap();
165 }
166 }
167
168 Self {
169 file_path: name.as_ref().to_path_buf(),
170 chunks,
171 format_data_info: header.data_format_info.to_vec(),
172 file_size: MemoryFs::get_file_size(name).unwrap() as u64,
173 }
174 }
175
176 pub fn get_data_format_info<T: Decode<()>>(&self) -> T {
177 bincode::decode_from_slice(&self.format_data_info, DEFAULT_BINCODE_CONFIG)
178 .unwrap()
179 .0
180 }
181
182 pub fn get_path(&self) -> &Path {
183 &self.file_path
184 }
185
186 pub fn into_chunks(self) -> Vec<BinaryReaderChunk> {
187 self.chunks
188 }
189
190 pub fn into_parallel_chunks(self) -> Mutex<VecDeque<BinaryReaderChunk>> {
191 Mutex::new(self.chunks.into())
192 }
193}
194
195pub struct BinaryChunkReader<D: ChunkDecoder> {
196 reader: D::ReaderType,
197 _remove_guard: Arc<RemoveFileGuard>,
198}
199
200impl<D: ChunkDecoder> BinaryChunkReader<D> {
201 pub fn new(chunk: BinaryReaderChunk) -> Self {
202 Self {
203 reader: D::decode_stream(chunk.reader, chunk.length),
204 _remove_guard: chunk.remove_guard,
205 }
206 }
207}
208
209impl<D: ChunkDecoder> Read for BinaryChunkReader<D> {
210 #[inline(always)]
211 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
212 self.reader.read(buf)
213 }
214}