base_d/encoders/streaming/
decoder.rs1use crate::core::dictionary::Dictionary;
2use crate::encoders::algorithms::DecodeError;
3use crate::features::compression::CompressionAlgorithm;
4use crate::features::hashing::HashAlgorithm;
5use std::io::{Read, Write};
6
7use super::hasher::{HasherWriter, create_hasher_writer};
8
9const CHUNK_SIZE: usize = 4096; pub struct StreamingDecoder<'a, W: Write> {
17 dictionary: &'a Dictionary,
18 writer: W,
19 decompress_algo: Option<CompressionAlgorithm>,
20 hash_algo: Option<HashAlgorithm>,
21 xxhash_config: crate::features::hashing::XxHashConfig,
22}
23
24impl<'a, W: Write> StreamingDecoder<'a, W> {
25 pub fn new(dictionary: &'a Dictionary, writer: W) -> Self {
32 StreamingDecoder {
33 dictionary,
34 writer,
35 decompress_algo: None,
36 hash_algo: None,
37 xxhash_config: crate::features::hashing::XxHashConfig::default(),
38 }
39 }
40
41 pub fn with_decompression(mut self, algo: CompressionAlgorithm) -> Self {
43 self.decompress_algo = Some(algo);
44 self
45 }
46
47 pub fn with_hashing(mut self, algo: HashAlgorithm) -> Self {
49 self.hash_algo = Some(algo);
50 self
51 }
52
53 pub fn with_xxhash_config(mut self, config: crate::features::hashing::XxHashConfig) -> Self {
55 self.xxhash_config = config;
56 self
57 }
58
59 pub fn decode<R: Read>(&mut self, reader: &mut R) -> Result<Option<Vec<u8>>, DecodeError> {
67 if let Some(algo) = self.decompress_algo {
69 return self.decode_with_decompression(reader, algo);
70 }
71
72 match self.dictionary.mode() {
74 crate::core::config::EncodingMode::Chunked => self.decode_chunked(reader),
75 crate::core::config::EncodingMode::ByteRange => self.decode_byte_range(reader),
76 crate::core::config::EncodingMode::BaseConversion => {
77 let mut buffer = String::new();
79 reader
80 .read_to_string(&mut buffer)
81 .map_err(|_| DecodeError::InvalidCharacter {
82 char: '\0',
83 position: 0,
84 input: String::new(),
85 valid_chars: String::new(),
86 })?;
87 let decoded = crate::encoders::algorithms::math::decode(&buffer, self.dictionary)?;
88
89 let hash = self
90 .hash_algo
91 .map(|algo| crate::features::hashing::hash(&decoded, algo));
92
93 self.writer
94 .write_all(&decoded)
95 .map_err(|_| DecodeError::InvalidCharacter {
96 char: '\0',
97 position: 0,
98 input: String::new(),
99 valid_chars: String::new(),
100 })?;
101 Ok(hash)
102 }
103 }
104 }
105
106 fn decode_with_decompression<R: Read>(
108 &mut self,
109 reader: &mut R,
110 algo: CompressionAlgorithm,
111 ) -> Result<Option<Vec<u8>>, DecodeError> {
112 use std::io::Cursor;
113
114 let mut compressed_data = Vec::new();
116 {
117 let mut temp_decoder = StreamingDecoder::new(self.dictionary, &mut compressed_data);
118 temp_decoder.decode(reader)?;
119 }
120
121 let mut cursor = Cursor::new(compressed_data);
123 let hash = self.decompress_stream(&mut cursor, algo).map_err(|_| {
124 DecodeError::InvalidCharacter {
125 char: '\0',
126 position: 0,
127 input: String::new(),
128 valid_chars: String::new(),
129 }
130 })?;
131
132 Ok(hash)
133 }
134
135 fn decompress_stream<R: Read>(
137 &mut self,
138 reader: &mut R,
139 algo: CompressionAlgorithm,
140 ) -> std::io::Result<Option<Vec<u8>>> {
141 use flate2::read::GzDecoder;
142 use xz2::read::XzDecoder;
143
144 let mut hasher = self
145 .hash_algo
146 .map(|algo| create_hasher_writer(algo, &self.xxhash_config));
147
148 match algo {
149 CompressionAlgorithm::Gzip => {
150 let mut decoder = GzDecoder::new(reader);
151 Self::copy_with_hash_to_writer(&mut decoder, &mut self.writer, &mut hasher)?;
152 }
153 CompressionAlgorithm::Zstd => {
154 let mut decoder =
155 zstd::stream::read::Decoder::new(reader).map_err(std::io::Error::other)?;
156 Self::copy_with_hash_to_writer(&mut decoder, &mut self.writer, &mut hasher)?;
157 }
158 CompressionAlgorithm::Brotli => {
159 let mut decoder = brotli::Decompressor::new(reader, 4096);
160 Self::copy_with_hash_to_writer(&mut decoder, &mut self.writer, &mut hasher)?;
161 }
162 CompressionAlgorithm::Lzma => {
163 let mut decoder = XzDecoder::new(reader);
164 Self::copy_with_hash_to_writer(&mut decoder, &mut self.writer, &mut hasher)?;
165 }
166 CompressionAlgorithm::Lz4 | CompressionAlgorithm::Snappy => {
167 let mut compressed = Vec::new();
169 reader.read_to_end(&mut compressed)?;
170
171 let decompressed = match algo {
172 CompressionAlgorithm::Lz4 => {
173 lz4::block::decompress(&compressed, Some(100 * 1024 * 1024))
174 .map_err(std::io::Error::other)?
175 }
176 CompressionAlgorithm::Snappy => {
177 let mut decoder = snap::raw::Decoder::new();
178 decoder
179 .decompress_vec(&compressed)
180 .map_err(std::io::Error::other)?
181 }
182 _ => unreachable!(),
183 };
184
185 let hash = self
186 .hash_algo
187 .map(|algo| crate::features::hashing::hash(&decompressed, algo));
188 self.writer.write_all(&decompressed)?;
189 return Ok(hash);
190 }
191 }
192
193 Ok(hasher.map(|h| h.finalize()))
194 }
195
196 fn copy_with_hash_to_writer<R: Read>(
197 reader: &mut R,
198 writer: &mut W,
199 hasher: &mut Option<HasherWriter>,
200 ) -> std::io::Result<()> {
201 let mut buffer = vec![0u8; CHUNK_SIZE];
202
203 loop {
204 let bytes_read = reader.read(&mut buffer)?;
205 if bytes_read == 0 {
206 break;
207 }
208
209 let chunk = &buffer[..bytes_read];
210 if let Some(h) = hasher {
211 h.update(chunk);
212 }
213 writer.write_all(chunk)?;
214 }
215
216 Ok(())
217 }
218
219 fn decode_chunked<R: Read>(&mut self, reader: &mut R) -> Result<Option<Vec<u8>>, DecodeError> {
220 let base = self.dictionary.base();
221 let bits_per_char = (base as f64).log2() as usize;
222 let chars_per_group = 8 / bits_per_char;
223
224 let mut text_buffer = String::new();
226 let mut char_buffer = vec![0u8; CHUNK_SIZE];
227 let mut hasher = self
228 .hash_algo
229 .map(|algo| create_hasher_writer(algo, &self.xxhash_config));
230
231 loop {
232 let bytes_read =
233 reader
234 .read(&mut char_buffer)
235 .map_err(|_| DecodeError::InvalidCharacter {
236 char: '\0',
237 position: 0,
238 input: String::new(),
239 valid_chars: String::new(),
240 })?;
241 if bytes_read == 0 {
242 break;
243 }
244
245 let chunk_str = std::str::from_utf8(&char_buffer[..bytes_read]).map_err(|_| {
246 DecodeError::InvalidCharacter {
247 char: '\0',
248 position: 0,
249 input: String::new(),
250 valid_chars: String::new(),
251 }
252 })?;
253 text_buffer.push_str(chunk_str);
254
255 let chars: Vec<char> = text_buffer.chars().collect();
257 let complete_groups = (chars.len() / chars_per_group) * chars_per_group;
258
259 if complete_groups > 0 {
260 let to_decode: String = chars[..complete_groups].iter().collect();
261 let decoded = crate::encoders::algorithms::chunked::decode_chunked(
262 &to_decode,
263 self.dictionary,
264 )?;
265
266 if let Some(ref mut h) = hasher {
267 h.update(&decoded);
268 }
269
270 self.writer
271 .write_all(&decoded)
272 .map_err(|_| DecodeError::InvalidCharacter {
273 char: '\0',
274 position: 0,
275 input: String::new(),
276 valid_chars: String::new(),
277 })?;
278
279 text_buffer = chars[complete_groups..].iter().collect();
281 }
282 }
283
284 if !text_buffer.is_empty() {
286 let decoded = crate::encoders::algorithms::chunked::decode_chunked(
287 &text_buffer,
288 self.dictionary,
289 )?;
290
291 if let Some(ref mut h) = hasher {
292 h.update(&decoded);
293 }
294
295 self.writer
296 .write_all(&decoded)
297 .map_err(|_| DecodeError::InvalidCharacter {
298 char: '\0',
299 position: 0,
300 input: String::new(),
301 valid_chars: String::new(),
302 })?;
303 }
304
305 Ok(hasher.map(|h| h.finalize()))
306 }
307
308 fn decode_byte_range<R: Read>(
309 &mut self,
310 reader: &mut R,
311 ) -> Result<Option<Vec<u8>>, DecodeError> {
312 let mut char_buffer = vec![0u8; CHUNK_SIZE];
313 let mut hasher = self
314 .hash_algo
315 .map(|algo| create_hasher_writer(algo, &self.xxhash_config));
316
317 loop {
318 let bytes_read =
319 reader
320 .read(&mut char_buffer)
321 .map_err(|_| DecodeError::InvalidCharacter {
322 char: '\0',
323 position: 0,
324 input: String::new(),
325 valid_chars: String::new(),
326 })?;
327 if bytes_read == 0 {
328 break;
329 }
330
331 let chunk_str = std::str::from_utf8(&char_buffer[..bytes_read]).map_err(|_| {
332 DecodeError::InvalidCharacter {
333 char: '\0',
334 position: 0,
335 input: String::new(),
336 valid_chars: String::new(),
337 }
338 })?;
339
340 let decoded = crate::encoders::algorithms::byte_range::decode_byte_range(
341 chunk_str,
342 self.dictionary,
343 )?;
344
345 if let Some(ref mut h) = hasher {
346 h.update(&decoded);
347 }
348
349 self.writer
350 .write_all(&decoded)
351 .map_err(|_| DecodeError::InvalidCharacter {
352 char: '\0',
353 position: 0,
354 input: String::new(),
355 valid_chars: String::new(),
356 })?;
357 }
358
359 Ok(hasher.map(|h| h.finalize()))
360 }
361}