base_d/encoders/streaming/
decoder.rs1use crate::core::dictionary::Dictionary;
2use crate::encoders::algorithms::DecodeError;
3use crate::features::compression::CompressionAlgorithm;
4use crate::features::hashing::HashAlgorithm;
5use std::io::{Read, Write};
6
7use super::hasher::{create_hasher_writer, HasherWriter};
8
9const CHUNK_SIZE: usize = 4096; pub struct StreamingDecoder<'a, W: Write> {
17 dictionary: &'a Dictionary,
18 writer: W,
19 decompress_algo: Option<CompressionAlgorithm>,
20 hash_algo: Option<HashAlgorithm>,
21 xxhash_config: crate::features::hashing::XxHashConfig,
22}
23
24impl<'a, W: Write> StreamingDecoder<'a, W> {
25 pub fn new(dictionary: &'a Dictionary, writer: W) -> Self {
32 StreamingDecoder {
33 dictionary,
34 writer,
35 decompress_algo: None,
36 hash_algo: None,
37 xxhash_config: crate::features::hashing::XxHashConfig::default(),
38 }
39 }
40
41 pub fn with_decompression(mut self, algo: CompressionAlgorithm) -> Self {
43 self.decompress_algo = Some(algo);
44 self
45 }
46
47 pub fn with_hashing(mut self, algo: HashAlgorithm) -> Self {
49 self.hash_algo = Some(algo);
50 self
51 }
52
53 pub fn with_xxhash_config(mut self, config: crate::features::hashing::XxHashConfig) -> Self {
55 self.xxhash_config = config;
56 self
57 }
58
59 pub fn decode<R: Read>(&mut self, reader: &mut R) -> Result<Option<Vec<u8>>, DecodeError> {
67 if let Some(algo) = self.decompress_algo {
69 return self.decode_with_decompression(reader, algo);
70 }
71
72 match self.dictionary.mode() {
74 crate::core::config::EncodingMode::Chunked => self.decode_chunked(reader),
75 crate::core::config::EncodingMode::ByteRange => self.decode_byte_range(reader),
76 crate::core::config::EncodingMode::BaseConversion => {
77 let mut buffer = String::new();
79 reader
80 .read_to_string(&mut buffer)
81 .map_err(|_| DecodeError::InvalidCharacter('\0'))?;
82 let decoded = crate::encoders::algorithms::math::decode(&buffer, self.dictionary)?;
83
84 let hash = self
85 .hash_algo
86 .map(|algo| crate::features::hashing::hash(&decoded, algo));
87
88 self.writer
89 .write_all(&decoded)
90 .map_err(|_| DecodeError::InvalidCharacter('\0'))?;
91 Ok(hash)
92 }
93 }
94 }
95
96 fn decode_with_decompression<R: Read>(
98 &mut self,
99 reader: &mut R,
100 algo: CompressionAlgorithm,
101 ) -> Result<Option<Vec<u8>>, DecodeError> {
102 use std::io::Cursor;
103
104 let mut compressed_data = Vec::new();
106 {
107 let mut temp_decoder = StreamingDecoder::new(self.dictionary, &mut compressed_data);
108 temp_decoder.decode(reader)?;
109 }
110
111 let mut cursor = Cursor::new(compressed_data);
113 let hash = self
114 .decompress_stream(&mut cursor, algo)
115 .map_err(|_| DecodeError::InvalidCharacter('\0'))?;
116
117 Ok(hash)
118 }
119
120 fn decompress_stream<R: Read>(
122 &mut self,
123 reader: &mut R,
124 algo: CompressionAlgorithm,
125 ) -> std::io::Result<Option<Vec<u8>>> {
126 use flate2::read::GzDecoder;
127 use xz2::read::XzDecoder;
128
129 let mut hasher = self
130 .hash_algo
131 .map(|algo| create_hasher_writer(algo, &self.xxhash_config));
132
133 match algo {
134 CompressionAlgorithm::Gzip => {
135 let mut decoder = GzDecoder::new(reader);
136 Self::copy_with_hash_to_writer(&mut decoder, &mut self.writer, &mut hasher)?;
137 }
138 CompressionAlgorithm::Zstd => {
139 let mut decoder = zstd::stream::read::Decoder::new(reader)
140 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
141 Self::copy_with_hash_to_writer(&mut decoder, &mut self.writer, &mut hasher)?;
142 }
143 CompressionAlgorithm::Brotli => {
144 let mut decoder = brotli::Decompressor::new(reader, 4096);
145 Self::copy_with_hash_to_writer(&mut decoder, &mut self.writer, &mut hasher)?;
146 }
147 CompressionAlgorithm::Lzma => {
148 let mut decoder = XzDecoder::new(reader);
149 Self::copy_with_hash_to_writer(&mut decoder, &mut self.writer, &mut hasher)?;
150 }
151 CompressionAlgorithm::Lz4 | CompressionAlgorithm::Snappy => {
152 let mut compressed = Vec::new();
154 reader.read_to_end(&mut compressed)?;
155
156 let decompressed = match algo {
157 CompressionAlgorithm::Lz4 => {
158 lz4::block::decompress(&compressed, Some(100 * 1024 * 1024))
159 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?
160 }
161 CompressionAlgorithm::Snappy => {
162 let mut decoder = snap::raw::Decoder::new();
163 decoder
164 .decompress_vec(&compressed)
165 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?
166 }
167 _ => unreachable!(),
168 };
169
170 let hash = self
171 .hash_algo
172 .map(|algo| crate::features::hashing::hash(&decompressed, algo));
173 self.writer.write_all(&decompressed)?;
174 return Ok(hash);
175 }
176 }
177
178 Ok(hasher.map(|h| h.finalize()))
179 }
180
181 fn copy_with_hash_to_writer<R: Read>(
182 reader: &mut R,
183 writer: &mut W,
184 hasher: &mut Option<HasherWriter>,
185 ) -> std::io::Result<()> {
186 let mut buffer = vec![0u8; CHUNK_SIZE];
187
188 loop {
189 let bytes_read = reader.read(&mut buffer)?;
190 if bytes_read == 0 {
191 break;
192 }
193
194 let chunk = &buffer[..bytes_read];
195 if let Some(ref mut h) = hasher {
196 h.update(chunk);
197 }
198 writer.write_all(chunk)?;
199 }
200
201 Ok(())
202 }
203
204 fn decode_chunked<R: Read>(&mut self, reader: &mut R) -> Result<Option<Vec<u8>>, DecodeError> {
205 let base = self.dictionary.base();
206 let bits_per_char = (base as f64).log2() as usize;
207 let chars_per_group = 8 / bits_per_char;
208
209 let mut text_buffer = String::new();
211 let mut char_buffer = vec![0u8; CHUNK_SIZE];
212 let mut hasher = self
213 .hash_algo
214 .map(|algo| create_hasher_writer(algo, &self.xxhash_config));
215
216 loop {
217 let bytes_read = reader
218 .read(&mut char_buffer)
219 .map_err(|_| DecodeError::InvalidCharacter('\0'))?;
220 if bytes_read == 0 {
221 break;
222 }
223
224 let chunk_str = std::str::from_utf8(&char_buffer[..bytes_read])
225 .map_err(|_| DecodeError::InvalidCharacter('\0'))?;
226 text_buffer.push_str(chunk_str);
227
228 let chars: Vec<char> = text_buffer.chars().collect();
230 let complete_groups = (chars.len() / chars_per_group) * chars_per_group;
231
232 if complete_groups > 0 {
233 let to_decode: String = chars[..complete_groups].iter().collect();
234 let decoded = crate::encoders::algorithms::chunked::decode_chunked(
235 &to_decode,
236 self.dictionary,
237 )?;
238
239 if let Some(ref mut h) = hasher {
240 h.update(&decoded);
241 }
242
243 self.writer
244 .write_all(&decoded)
245 .map_err(|_| DecodeError::InvalidCharacter('\0'))?;
246
247 text_buffer = chars[complete_groups..].iter().collect();
249 }
250 }
251
252 if !text_buffer.is_empty() {
254 let decoded = crate::encoders::algorithms::chunked::decode_chunked(
255 &text_buffer,
256 self.dictionary,
257 )?;
258
259 if let Some(ref mut h) = hasher {
260 h.update(&decoded);
261 }
262
263 self.writer
264 .write_all(&decoded)
265 .map_err(|_| DecodeError::InvalidCharacter('\0'))?;
266 }
267
268 Ok(hasher.map(|h| h.finalize()))
269 }
270
271 fn decode_byte_range<R: Read>(
272 &mut self,
273 reader: &mut R,
274 ) -> Result<Option<Vec<u8>>, DecodeError> {
275 let mut char_buffer = vec![0u8; CHUNK_SIZE];
276 let mut hasher = self
277 .hash_algo
278 .map(|algo| create_hasher_writer(algo, &self.xxhash_config));
279
280 loop {
281 let bytes_read = reader
282 .read(&mut char_buffer)
283 .map_err(|_| DecodeError::InvalidCharacter('\0'))?;
284 if bytes_read == 0 {
285 break;
286 }
287
288 let chunk_str = std::str::from_utf8(&char_buffer[..bytes_read])
289 .map_err(|_| DecodeError::InvalidCharacter('\0'))?;
290
291 let decoded = crate::encoders::algorithms::byte_range::decode_byte_range(
292 chunk_str,
293 self.dictionary,
294 )?;
295
296 if let Some(ref mut h) = hasher {
297 h.update(&decoded);
298 }
299
300 self.writer
301 .write_all(&decoded)
302 .map_err(|_| DecodeError::InvalidCharacter('\0'))?;
303 }
304
305 Ok(hasher.map(|h| h.finalize()))
306 }
307}