base_d/encoders/streaming/
decoder.rs1use crate::core::dictionary::Dictionary;
2use crate::encoders::algorithms::DecodeError;
3use crate::features::compression::CompressionAlgorithm;
4use crate::features::hashing::HashAlgorithm;
5use std::io::{Read, Write};
6
7use super::hasher::{HasherWriter, create_hasher_writer};
8
9const CHUNK_SIZE: usize = 4096; pub struct StreamingDecoder<'a, W: Write> {
17 dictionary: &'a Dictionary,
18 writer: W,
19 decompress_algo: Option<CompressionAlgorithm>,
20 hash_algo: Option<HashAlgorithm>,
21 xxhash_config: crate::features::hashing::XxHashConfig,
22}
23
24impl<'a, W: Write> StreamingDecoder<'a, W> {
25 pub fn new(dictionary: &'a Dictionary, writer: W) -> Self {
32 StreamingDecoder {
33 dictionary,
34 writer,
35 decompress_algo: None,
36 hash_algo: None,
37 xxhash_config: crate::features::hashing::XxHashConfig::default(),
38 }
39 }
40
41 pub fn with_decompression(mut self, algo: CompressionAlgorithm) -> Self {
43 self.decompress_algo = Some(algo);
44 self
45 }
46
47 pub fn with_hashing(mut self, algo: HashAlgorithm) -> Self {
49 self.hash_algo = Some(algo);
50 self
51 }
52
53 pub fn with_xxhash_config(mut self, config: crate::features::hashing::XxHashConfig) -> Self {
55 self.xxhash_config = config;
56 self
57 }
58
59 pub fn decode<R: Read>(&mut self, reader: &mut R) -> Result<Option<Vec<u8>>, DecodeError> {
67 if let Some(algo) = self.decompress_algo {
69 return self.decode_with_decompression(reader, algo);
70 }
71
72 match self.dictionary.mode() {
74 crate::core::config::EncodingMode::Chunked => self.decode_chunked(reader),
75 crate::core::config::EncodingMode::ByteRange => self.decode_byte_range(reader),
76 crate::core::config::EncodingMode::Radix => {
77 let mut buffer = String::new();
79 reader
80 .read_to_string(&mut buffer)
81 .map_err(|_| DecodeError::InvalidCharacter {
82 char: '\0',
83 position: 0,
84 input: String::new(),
85 valid_chars: String::new(),
86 })?;
87 let decoded = crate::encoders::algorithms::radix::decode(&buffer, self.dictionary)?;
88
89 let hash = self
90 .hash_algo
91 .map(|algo| crate::features::hashing::hash(&decoded, algo));
92
93 self.writer
94 .write_all(&decoded)
95 .map_err(|_| DecodeError::InvalidCharacter {
96 char: '\0',
97 position: 0,
98 input: String::new(),
99 valid_chars: String::new(),
100 })?;
101 Ok(hash)
102 }
103 }
104 }
105
106 fn decode_with_decompression<R: Read>(
108 &mut self,
109 reader: &mut R,
110 algo: CompressionAlgorithm,
111 ) -> Result<Option<Vec<u8>>, DecodeError> {
112 use std::io::Cursor;
113
114 let mut compressed_data = Vec::new();
116 {
117 let mut temp_decoder = StreamingDecoder::new(self.dictionary, &mut compressed_data);
118 temp_decoder.decode(reader)?;
119 }
120
121 let mut cursor = Cursor::new(compressed_data);
123 let hash = self.decompress_stream(&mut cursor, algo).map_err(|_| {
124 DecodeError::InvalidCharacter {
125 char: '\0',
126 position: 0,
127 input: String::new(),
128 valid_chars: String::new(),
129 }
130 })?;
131
132 Ok(hash)
133 }
134
135 fn decompress_stream<R: Read>(
137 &mut self,
138 reader: &mut R,
139 algo: CompressionAlgorithm,
140 ) -> std::io::Result<Option<Vec<u8>>> {
141 use flate2::read::GzDecoder;
142
143 let mut hasher = self
144 .hash_algo
145 .map(|algo| create_hasher_writer(algo, &self.xxhash_config));
146
147 match algo {
148 CompressionAlgorithm::Gzip => {
149 let mut decoder = GzDecoder::new(reader);
150 Self::copy_with_hash_to_writer(&mut decoder, &mut self.writer, &mut hasher)?;
151 }
152 #[cfg(feature = "native-compression")]
153 CompressionAlgorithm::Zstd => {
154 let mut decoder =
155 zstd::stream::read::Decoder::new(reader).map_err(std::io::Error::other)?;
156 Self::copy_with_hash_to_writer(&mut decoder, &mut self.writer, &mut hasher)?;
157 }
158 #[cfg(all(feature = "wasm", not(feature = "native-compression")))]
159 CompressionAlgorithm::Zstd => {
160 let mut decoder =
161 ruzstd::StreamingDecoder::new(reader).map_err(std::io::Error::other)?;
162 Self::copy_with_hash_to_writer(&mut decoder, &mut self.writer, &mut hasher)?;
163 }
164 CompressionAlgorithm::Brotli => {
165 let mut decoder = brotli::Decompressor::new(reader, 4096);
166 Self::copy_with_hash_to_writer(&mut decoder, &mut self.writer, &mut hasher)?;
167 }
168 #[cfg(feature = "native-compression")]
169 CompressionAlgorithm::Lzma => {
170 use xz2::read::XzDecoder;
171 let mut decoder = XzDecoder::new(reader);
172 Self::copy_with_hash_to_writer(&mut decoder, &mut self.writer, &mut hasher)?;
173 }
174 #[cfg(all(feature = "wasm", not(feature = "native-compression")))]
175 CompressionAlgorithm::Lzma => {
176 let mut compressed = Vec::new();
178 reader.read_to_end(&mut compressed)?;
179
180 use std::io::Cursor;
181 let mut decompressed = Vec::new();
182 lzma_rs::lzma_decompress(&mut Cursor::new(&compressed), &mut decompressed)
183 .map_err(std::io::Error::other)?;
184
185 let hash = self
186 .hash_algo
187 .map(|algo| crate::features::hashing::hash(&decompressed, algo));
188 self.writer.write_all(&decompressed)?;
189 return Ok(hash);
190 }
191 CompressionAlgorithm::Lz4 | CompressionAlgorithm::Snappy => {
192 let mut compressed = Vec::new();
194 reader.read_to_end(&mut compressed)?;
195
196 let decompressed = match algo {
197 #[cfg(feature = "native-compression")]
198 CompressionAlgorithm::Lz4 => {
199 lz4::block::decompress(&compressed, Some(100 * 1024 * 1024))
200 .map_err(std::io::Error::other)?
201 }
202 #[cfg(all(feature = "wasm", not(feature = "native-compression")))]
203 CompressionAlgorithm::Lz4 => lz4_flex::decompress_size_prepended(&compressed)
204 .map_err(std::io::Error::other)?,
205 CompressionAlgorithm::Snappy => {
206 let mut decoder = snap::raw::Decoder::new();
207 decoder
208 .decompress_vec(&compressed)
209 .map_err(std::io::Error::other)?
210 }
211 _ => unreachable!(),
212 };
213
214 let hash = self
215 .hash_algo
216 .map(|algo| crate::features::hashing::hash(&decompressed, algo));
217 self.writer.write_all(&decompressed)?;
218 return Ok(hash);
219 }
220 }
221
222 Ok(hasher.map(|h| h.finalize()))
223 }
224
225 fn copy_with_hash_to_writer<R: Read>(
226 reader: &mut R,
227 writer: &mut W,
228 hasher: &mut Option<HasherWriter>,
229 ) -> std::io::Result<()> {
230 let mut buffer = vec![0u8; CHUNK_SIZE];
231
232 loop {
233 let bytes_read = reader.read(&mut buffer)?;
234 if bytes_read == 0 {
235 break;
236 }
237
238 let chunk = &buffer[..bytes_read];
239 if let Some(h) = hasher {
240 h.update(chunk);
241 }
242 writer.write_all(chunk)?;
243 }
244
245 Ok(())
246 }
247
248 fn decode_chunked<R: Read>(&mut self, reader: &mut R) -> Result<Option<Vec<u8>>, DecodeError> {
249 let base = self.dictionary.base();
250 let bits_per_char = (base as f64).log2() as usize;
251 let chars_per_group = 8 / bits_per_char;
252
253 let mut text_buffer = String::new();
255 let mut char_buffer = vec![0u8; CHUNK_SIZE];
256 let mut hasher = self
257 .hash_algo
258 .map(|algo| create_hasher_writer(algo, &self.xxhash_config));
259
260 loop {
261 let bytes_read =
262 reader
263 .read(&mut char_buffer)
264 .map_err(|_| DecodeError::InvalidCharacter {
265 char: '\0',
266 position: 0,
267 input: String::new(),
268 valid_chars: String::new(),
269 })?;
270 if bytes_read == 0 {
271 break;
272 }
273
274 let chunk_str = std::str::from_utf8(&char_buffer[..bytes_read]).map_err(|_| {
275 DecodeError::InvalidCharacter {
276 char: '\0',
277 position: 0,
278 input: String::new(),
279 valid_chars: String::new(),
280 }
281 })?;
282 text_buffer.push_str(chunk_str);
283
284 let chars: Vec<char> = text_buffer.chars().collect();
286 let complete_groups = (chars.len() / chars_per_group) * chars_per_group;
287
288 if complete_groups > 0 {
289 let to_decode: String = chars[..complete_groups].iter().collect();
290 let decoded = crate::encoders::algorithms::chunked::decode_chunked(
291 &to_decode,
292 self.dictionary,
293 )?;
294
295 if let Some(ref mut h) = hasher {
296 h.update(&decoded);
297 }
298
299 self.writer
300 .write_all(&decoded)
301 .map_err(|_| DecodeError::InvalidCharacter {
302 char: '\0',
303 position: 0,
304 input: String::new(),
305 valid_chars: String::new(),
306 })?;
307
308 text_buffer = chars[complete_groups..].iter().collect();
310 }
311 }
312
313 if !text_buffer.is_empty() {
315 let decoded = crate::encoders::algorithms::chunked::decode_chunked(
316 &text_buffer,
317 self.dictionary,
318 )?;
319
320 if let Some(ref mut h) = hasher {
321 h.update(&decoded);
322 }
323
324 self.writer
325 .write_all(&decoded)
326 .map_err(|_| DecodeError::InvalidCharacter {
327 char: '\0',
328 position: 0,
329 input: String::new(),
330 valid_chars: String::new(),
331 })?;
332 }
333
334 Ok(hasher.map(|h| h.finalize()))
335 }
336
337 fn decode_byte_range<R: Read>(
338 &mut self,
339 reader: &mut R,
340 ) -> Result<Option<Vec<u8>>, DecodeError> {
341 let mut char_buffer = vec![0u8; CHUNK_SIZE];
342 let mut hasher = self
343 .hash_algo
344 .map(|algo| create_hasher_writer(algo, &self.xxhash_config));
345
346 loop {
347 let bytes_read =
348 reader
349 .read(&mut char_buffer)
350 .map_err(|_| DecodeError::InvalidCharacter {
351 char: '\0',
352 position: 0,
353 input: String::new(),
354 valid_chars: String::new(),
355 })?;
356 if bytes_read == 0 {
357 break;
358 }
359
360 let chunk_str = std::str::from_utf8(&char_buffer[..bytes_read]).map_err(|_| {
361 DecodeError::InvalidCharacter {
362 char: '\0',
363 position: 0,
364 input: String::new(),
365 valid_chars: String::new(),
366 }
367 })?;
368
369 let decoded = crate::encoders::algorithms::byte_range::decode_byte_range(
370 chunk_str,
371 self.dictionary,
372 )?;
373
374 if let Some(ref mut h) = hasher {
375 h.update(&decoded);
376 }
377
378 self.writer
379 .write_all(&decoded)
380 .map_err(|_| DecodeError::InvalidCharacter {
381 char: '\0',
382 position: 0,
383 input: String::new(),
384 valid_chars: String::new(),
385 })?;
386 }
387
388 Ok(hasher.map(|h| h.finalize()))
389 }
390}